diff options
author | hor911 <hor911@ydb.tech> | 2022-08-01 13:33:20 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-08-01 13:33:20 +0300 |
commit | 5cbf4c3a664cee3d79b1b449b252fd28c373ee98 (patch) | |
tree | d584ee10f5854c3d23a5476157e1e37dd6133835 | |
parent | 08c896760519c0f6e107dc6afcc227a21e812be8 (diff) | |
download | ydb-5cbf4c3a664cee3d79b1b449b252fd28c373ee98.tar.gz |
Quota DB persistance
20 files changed, 628 insertions, 140 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index 139c1595f4..f93658dc33 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -771,6 +771,7 @@ add_subdirectory(ydb/core/yq/libs/common) add_subdirectory(ydb/core/yq/libs/control_plane_storage/events) add_subdirectory(ydb/core/yq/libs/control_plane_storage/proto) add_subdirectory(ydb/core/yq/libs/quota_manager/events) +add_subdirectory(ydb/core/yq/libs/quota_manager/proto) add_subdirectory(ydb/core/yq/libs/control_plane_storage) add_subdirectory(library/cpp/protobuf/interop) add_subdirectory(ydb/core/yq/libs/config) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 51a9ab4284..49fb1cec4e 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -775,6 +775,7 @@ add_subdirectory(ydb/core/yq/libs/common) add_subdirectory(ydb/core/yq/libs/control_plane_storage/events) add_subdirectory(ydb/core/yq/libs/control_plane_storage/proto) add_subdirectory(ydb/core/yq/libs/quota_manager/events) +add_subdirectory(ydb/core/yq/libs/quota_manager/proto) add_subdirectory(ydb/core/yq/libs/control_plane_storage) add_subdirectory(library/cpp/protobuf/interop) add_subdirectory(ydb/core/yq/libs/config) diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 69f74043b1..8f27f228f1 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -301,9 +301,10 @@ enum EServiceKikimr { YQ_AUDIT = 1150; YQ_AUDIT_EVENT_SENDER = 1151; YQ_HEALTH = 1152; - YQ_RATE_LIMITER = 1155; FQ_INTERNAL_SERVICE = 1153; FQ_QUOTA_SERVICE = 1154; + YQ_RATE_LIMITER = 1155; + FQ_QUOTA_PROXY = 1156; // 1024 - 1099 is reserved for nbs diff --git a/ydb/core/yq/libs/config/protos/quotas_manager.proto b/ydb/core/yq/libs/config/protos/quotas_manager.proto index e3119141d8..8defdaee6d 100644 --- a/ydb/core/yq/libs/config/protos/quotas_manager.proto +++ b/ydb/core/yq/libs/config/protos/quotas_manager.proto @@ -20,6 +20,7 @@ message TQuotaList { message TQuotasManagerConfig { bool Enabled = 1; repeated TQuotaList Quotas = 2; - string UsageRefreshPeriod = 3; - bool EnablePermissions = 4; + string LimitRefreshPeriod = 3; + string UsageRefreshPeriod = 4; + bool EnablePermissions = 5; } diff --git a/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp index d0c8190729..d85c0352d5 100644 --- a/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp +++ b/ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.cpp @@ -103,7 +103,7 @@ public: void Bootstrap() { CPP_LOG_T("Get quotas bootstrap. Cloud id: " << Event->Get()->CloudId << " Actor id: " << SelfId()); Become(&TGetQuotaActor::StateFunc, TDuration::Seconds(10), new NActors::TEvents::TEvWakeup()); - Send(MakeQuotaServiceActorId(), new TEvQuotaService::TQuotaGetRequest(SUBJECT_TYPE_CLOUD, Event->Get()->CloudId)); + Send(MakeQuotaServiceActorId(SelfId().NodeId()), new TEvQuotaService::TQuotaGetRequest(SUBJECT_TYPE_CLOUD, Event->Get()->CloudId)); } STRICT_STFUNC(StateFunc, @@ -120,7 +120,7 @@ public: void HandleTimeout() { CPP_LOG_D("Quota request timeout. Cloud id: " << Event->Get()->CloudId << " Actor id: " << SelfId()); - Send(MakeQuotaServiceActorId(), new TEvQuotaService::TQuotaGetRequest(SUBJECT_TYPE_CLOUD, Event->Get()->CloudId, true)); + Send(MakeQuotaServiceActorId(SelfId().NodeId()), new TEvQuotaService::TQuotaGetRequest(SUBJECT_TYPE_CLOUD, Event->Get()->CloudId, true)); } }; diff --git a/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt b/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt index ba0038e4a5..a1fa82c7c3 100644 --- a/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt +++ b/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt @@ -27,6 +27,7 @@ target_link_libraries(yq-libs-control_plane_storage PUBLIC libs-control_plane_storage-proto yq-libs-db_schema libs-graph_params-proto + libs-quota_manager-events yq-libs-shared_resources yq-libs-ydb ydb-library-security diff --git a/ydb/core/yq/libs/control_plane_storage/schema.h b/ydb/core/yq/libs/control_plane_storage/schema.h index 64dfa936f7..33b9ede7a4 100644 --- a/ydb/core/yq/libs/control_plane_storage/schema.h +++ b/ydb/core/yq/libs/control_plane_storage/schema.h @@ -75,6 +75,8 @@ namespace NYq { #define SUBJECT_ID_COLUMN_NAME "subject_id" #define METRIC_NAME_COLUMN_NAME "metric_name" #define METRIC_LIMIT_COLUMN_NAME "metric_limit" +#define LIMIT_UPDATED_AT_COLUMN_NAME "limit_updated_at" #define METRIC_USAGE_COLUMN_NAME "metric_usage" +#define USAGE_UPDATED_AT_COLUMN_NAME "usage_updated_at" } // namespace NYq diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp index 94c6280621..a20039f000 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp @@ -46,7 +46,7 @@ void TYdbControlPlaneStorageActor::Bootstrap() { CreateResultSetsTable(); CreateJobsTable(); CreateNodesTable(); - // CreateQuotasTable(); // not yet + CreateQuotasTable(); Become(&TThis::StateFunc); } @@ -240,8 +240,10 @@ void TYdbControlPlaneStorageActor::CreateQuotasTable() .AddNullableColumn(SUBJECT_TYPE_COLUMN_NAME, EPrimitiveType::String) .AddNullableColumn(SUBJECT_ID_COLUMN_NAME, EPrimitiveType::String) .AddNullableColumn(METRIC_NAME_COLUMN_NAME, EPrimitiveType::String) - .AddNullableColumn(METRIC_LIMIT_COLUMN_NAME, EPrimitiveType::Int64) - .AddNullableColumn(METRIC_USAGE_COLUMN_NAME, EPrimitiveType::Int64) + .AddNullableColumn(METRIC_LIMIT_COLUMN_NAME, EPrimitiveType::Uint64) + .AddNullableColumn(LIMIT_UPDATED_AT_COLUMN_NAME, EPrimitiveType::Timestamp) + .AddNullableColumn(METRIC_USAGE_COLUMN_NAME, EPrimitiveType::Uint64) + .AddNullableColumn(USAGE_UPDATED_AT_COLUMN_NAME, EPrimitiveType::Timestamp) .SetPrimaryKeyColumns({SUBJECT_TYPE_COLUMN_NAME, SUBJECT_ID_COLUMN_NAME, METRIC_NAME_COLUMN_NAME}) .Build(); diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index dec8d678b7..7b2354e021 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -80,9 +80,9 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery const TEvControlPlaneStorage::TEvCreateQueryRequest& event = *ev->Get(); const TString cloudId = event.CloudId; auto it = event.Quotas.find(QUOTA_RESULT_LIMIT); - ui64 resultLimit = (it != event.Quotas.end()) ? it->second.Limit : 0; + ui64 resultLimit = (it != event.Quotas.end()) ? it->second.Limit.Value : 0; auto exec_ttl_it = event.Quotas.find(QUOTA_TIME_LIMIT); - ui64 executionLimitMills = (exec_ttl_it != event.Quotas.end()) ? exec_ttl_it->second.Limit : 0; + ui64 executionLimitMills = (exec_ttl_it != event.Quotas.end()) ? exec_ttl_it->second.Limit.Value : 0; const TString scope = event.Scope; TRequestCountersPtr requestCounters = Counters.GetScopeCounters(cloudId, scope, RTS_CREATE_QUERY); requestCounters->InFly->Inc(); @@ -118,8 +118,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery auto& quota = it->second; if (!quota.Usage) { issues.AddIssue(MakeErrorIssue(TIssuesIds::NOT_READY, "Control Plane is not ready yet. Please retry later.")); - } else if (*quota.Usage >= quota.Limit) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::QUOTA_EXCEEDED, Sprintf("Too many queries (%lu of %lu). Please delete other queries or increase limits.", *quota.Usage, quota.Limit))); + } else if (quota.Usage->Value >= quota.Limit.Value) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::QUOTA_EXCEEDED, Sprintf("Too many queries (%lu of %lu). Please delete other queries or increase limits.", quota.Usage->Value, quota.Limit.Value))); } } } diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index 5141c31a36..c8d11788c9 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -284,14 +284,16 @@ void Init( if (protoConfig.GetQuotasManager().GetEnabled()) { auto quotaService = NYq::CreateQuotaServiceActor( protoConfig.GetQuotasManager(), - /* yqSharedResources, */ + protoConfig.GetControlPlaneStorage().GetStorage(), + yqSharedResources, + credentialsProviderFactory, serviceCounters.Counters, { TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_RESULT_LIMIT, 20_MB, 2_GB), TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_COUNT_LIMIT, 100, 200, NYq::ControlPlaneStorageServiceActorId()), TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_TIME_LIMIT, 0) }); - actorRegistrator(NYq::MakeQuotaServiceActorId(), quotaService); + actorRegistrator(NYq::MakeQuotaServiceActorId(nodeId), quotaService); auto quotaProxy = NYq::CreateQuotaProxyActor( protoConfig.GetQuotasManager(), diff --git a/ydb/core/yq/libs/quota_manager/events/CMakeLists.txt b/ydb/core/yq/libs/quota_manager/events/CMakeLists.txt index 0b39cb10db..2ef3752564 100644 --- a/ydb/core/yq/libs/quota_manager/events/CMakeLists.txt +++ b/ydb/core/yq/libs/quota_manager/events/CMakeLists.txt @@ -12,6 +12,7 @@ target_link_libraries(libs-quota_manager-events PUBLIC contrib-libs-cxxsupp yutil yq-libs-events + libs-quota_manager-proto ) target_sources(libs-quota_manager-events PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/quota_manager/events/events.cpp diff --git a/ydb/core/yq/libs/quota_manager/events/events.cpp b/ydb/core/yq/libs/quota_manager/events/events.cpp index b4d595d7a6..f065785565 100644 --- a/ydb/core/yq/libs/quota_manager/events/events.cpp +++ b/ydb/core/yq/libs/quota_manager/events/events.cpp @@ -1 +1,14 @@ -#include "events.h"
\ No newline at end of file +#include "events.h" + +namespace NYq { + +void TQuotaUsage::Merge(const TQuotaUsage& other) { + if (other.Limit.UpdatedAt > Limit.UpdatedAt) { + Limit = other.Limit; + } + if (!Usage || (other.Usage && other.Usage->UpdatedAt > Usage->UpdatedAt)) { + Usage = other.Usage; + } +} + +} /* NYq */
\ No newline at end of file diff --git a/ydb/core/yq/libs/quota_manager/events/events.h b/ydb/core/yq/libs/quota_manager/events/events.h index 0b87393bfe..21c1e06175 100644 --- a/ydb/core/yq/libs/quota_manager/events/events.h +++ b/ydb/core/yq/libs/quota_manager/events/events.h @@ -12,6 +12,8 @@ #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/event_local.h> +#include <ydb/core/yq/libs/quota_manager/proto/quota_internal.pb.h> + namespace NYq { constexpr auto SUBJECT_TYPE_CLOUD = "cloud"; @@ -43,15 +45,39 @@ struct TQuotaDescription { } }; -struct TQuotaUsage { - ui64 Limit; - TMaybe<ui64> Usage; +template <typename T> +struct TTimedValue { + T Value; TInstant UpdatedAt; + TTimedValue() = default; + TTimedValue(const TTimedValue&) = default; + TTimedValue(T value, const TInstant& updatedAt = TInstant::Zero()) : Value(value), UpdatedAt(updatedAt) {} +}; + +using TTimedUint64 = TTimedValue<ui64>; + +struct TQuotaUsage { + TTimedUint64 Limit; + TMaybe<TTimedUint64> Usage; TQuotaUsage() = default; TQuotaUsage(const TQuotaUsage&) = default; - TQuotaUsage(ui64 limit) : Limit(limit), UpdatedAt(TInstant::Zero()) {} - TQuotaUsage(ui64 limit, ui64 usage, const TInstant& updatedAt = Now()) - : Limit(limit), Usage(usage), UpdatedAt(updatedAt) {} + TQuotaUsage(ui64 limit, const TInstant& limitUpdatedAt = Now()) : Limit(limit, limitUpdatedAt) {} + TQuotaUsage(ui64 limit, const TInstant& limitUpdatedAt, ui64 usage, const TInstant& usageUpdatedAt = Now()) + : Limit(limit, limitUpdatedAt), Usage(NMaybe::TInPlace{}, usage, usageUpdatedAt) {} + void Merge(const TQuotaUsage& other); + TString ToString() { + return (Usage ? std::to_string(Usage->Value) : "*") + "/" + std::to_string(Limit.Value); + } + TString ToString(const TString& subjectType, const TString& subjectId, const TString& metricName) { + TStringBuilder builder; + builder << subjectType << "." << subjectId << "." << metricName << "=" << ToString(); + return builder; + } + TString ToString(const TString& metricName) { + TStringBuilder builder; + builder << metricName << "=" << ToString(); + return builder; + } }; using TQuotaMap = THashMap<TString, TQuotaUsage>; @@ -73,6 +99,7 @@ struct TEvQuotaService { EvQuotaSetResponse, EvQuotaLimitChangeRequest, EvQuotaLimitChangeResponse, + EvQuotaUpdateNotification, EvEnd, }; @@ -216,6 +243,15 @@ struct TEvQuotaService { {} }; + struct TEvQuotaUpdateNotification : public NActors::TEventPB<TEvQuotaUpdateNotification, + Fq::Quota::EvQuotaUpdateNotification, EvQuotaUpdateNotification> { + + TEvQuotaUpdateNotification() = default; + TEvQuotaUpdateNotification(const Fq::Quota::EvQuotaUpdateNotification& protoMessage) + : NActors::TEventPB<TEvQuotaUpdateNotification, Fq::Quota::EvQuotaUpdateNotification, EvQuotaUpdateNotification>(protoMessage) { + + } + }; }; } /* NYq */ diff --git a/ydb/core/yq/libs/quota_manager/proto/CMakeLists.txt b/ydb/core/yq/libs/quota_manager/proto/CMakeLists.txt new file mode 100644 index 0000000000..6e92863aed --- /dev/null +++ b/ydb/core/yq/libs/quota_manager/proto/CMakeLists.txt @@ -0,0 +1,31 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(libs-quota_manager-proto) +target_link_libraries(libs-quota_manager-proto PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf +) +target_proto_messages(libs-quota_manager-proto PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/quota_manager/proto/quota_internal.proto +) +target_proto_addincls(libs-quota_manager-proto + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(libs-quota_manager-proto + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) diff --git a/ydb/core/yq/libs/quota_manager/proto/quota_internal.proto b/ydb/core/yq/libs/quota_manager/proto/quota_internal.proto new file mode 100644 index 0000000000..9d32ac9690 --- /dev/null +++ b/ydb/core/yq/libs/quota_manager/proto/quota_internal.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; +option cc_enable_arenas = true; + +package Fq.Quota; + +import "google/protobuf/timestamp.proto"; + +//////////////////////////////////////////////////////////// + +message EvQuotaUpdateNotification { + string subject_type = 1; + string subject_id = 2; + string metric_name = 3; + uint64 metric_limit = 4; + google.protobuf.Timestamp limit_updated_at = 5; + uint64 metric_usage = 6; + google.protobuf.Timestamp usage_updated_at = 7; +} diff --git a/ydb/core/yq/libs/quota_manager/quota_manager.cpp b/ydb/core/yq/libs/quota_manager/quota_manager.cpp index 94eb9529a8..68ec8812e7 100644 --- a/ydb/core/yq/libs/quota_manager/quota_manager.cpp +++ b/ydb/core/yq/libs/quota_manager/quota_manager.cpp @@ -1,11 +1,13 @@ #include "quota_manager.h" #include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/interconnect/interconnect_impl.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <ydb/core/yq/libs/control_plane_storage/util.h> +#include <ydb/core/yq/libs/shared_resources/db_exec.h> #include <ydb/core/yq/libs/shared_resources/shared_resources.h> #include <ydb/core/protos/services.pb.h> @@ -18,51 +20,136 @@ LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream) #define LOG_D(stream) \ LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream) +#define LOG_T(stream) \ + LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream) namespace NYq { -NActors::TActorId MakeQuotaServiceActorId() { +NActors::TActorId MakeQuotaServiceActorId(ui32 nodeId) { constexpr TStringBuf name = "FQ_QUOTA"; - return NActors::TActorId(0, name); + return NActors::TActorId(nodeId, name); } +constexpr TDuration LIMIT_REFRESH_PERIOD = TDuration::Minutes(1); constexpr TDuration USAGE_REFRESH_PERIOD = TDuration::Seconds(10); struct TQuotaCachedUsage { TQuotaUsage Usage; TInstant RequestedAt = TInstant::Zero(); + bool SyncInProgress = false; + bool ChangedAfterSync = false; + TQuotaCachedUsage() = default; TQuotaCachedUsage(ui64 limit) : Usage(limit) {} - TQuotaCachedUsage(ui64 limit, ui64 usage, const TInstant& updatedAt) - : Usage(limit, usage, updatedAt) {} + TQuotaCachedUsage(ui64 limit, const TInstant& limitUpdatedAt, ui64 usage, const TInstant& usageUpdatedAt) + : Usage(limit, limitUpdatedAt, usage, usageUpdatedAt) {} }; struct TQuotaCache { THashMap<NActors::TActorId /* Sender */, ui64 /* Cookie */> PendingRequests; THashMap<TString /* MetricName */, TQuotaCachedUsage> UsageMap; THashSet<TString> PendingUsage; + TInstant LoadedAt = TInstant::Zero(); }; +struct TReadQuotaState { + TString SubjectType; + TString SubjectId; + THashMap<TString /* MetricName */, TQuotaUsage> UsageMap; +}; + +using TReadQuotaExecuter = TDbExecuter<TReadQuotaState>; + +struct TSyncQuotaState { + TString SubjectType; + TString SubjectId; + TString MetricName; + TQuotaUsage Usage; + bool Refreshed = false; +}; + +using TSyncQuotaExecuter = TDbExecuter<TSyncQuotaState>; + +TString ToString(const std::vector<ui32>& v) { + if (v.empty()) { + return "[]"; + } + TStringBuilder builder; + for (auto i : v) { + if (builder.empty()) { + builder << "[" << i; + } else { + builder << ", " << i; + } + if (builder.size() > 1024) { + builder << "..."; + break; + } + } + builder << "]"; + return builder; +} + +TString ToString(const TQuotaMap& quota){ + if (quota.empty()) { + return "{}"; + } + TStringBuilder builder; + for (auto p : quota) { + builder << (builder.empty() ? "{" : ", ") << p.second.ToString(p.first); + if (builder.size() > 1024) { + builder << "..."; + break; + } + } + builder << "}"; + return builder; +} + +TString ToString(const THashMap<TString, TQuotaCachedUsage>& usageMap){ + if (usageMap.empty()) { + return "{}"; + } + TStringBuilder builder; + for (auto p : usageMap) { + builder << (builder.empty() ? "{" : ", ") << p.second.Usage.ToString(p.first); + if (builder.size() > 1024) { + builder << "..."; + break; + } + } + builder << "}"; + return builder; +} + class TQuotaManagementService : public NActors::TActorBootstrapped<TQuotaManagementService> { public: TQuotaManagementService( const NConfig::TQuotasManagerConfig& config, - /* const NYq::TYqSharedResources::TPtr& yqSharedResources, */ + const NConfig::TYdbStorageConfig& storageConfig, + const TYqSharedResources::TPtr& yqSharedResources, + NKikimr::TYdbCredentialsProviderFactory credProviderFactory, const ::NMonitoring::TDynamicCounterPtr& counters, std::vector<TQuotaDescription> quotaDescriptions) : Config(config) - , ServiceCounters(counters->GetSubgroup("subsystem", "QuotaService")) + , StorageConfig(storageConfig) + , YqSharedResources(yqSharedResources) + , CredProviderFactory(credProviderFactory) + , ServiceCounters(counters->GetSubgroup("subsystem", "quota_manager")) { - /* Y_UNUSED(yqSharedResources); */ for (auto& description : quotaDescriptions) { QuotaInfoMap[description.SubjectType].emplace(description.MetricName, description.Info); } + LimitRefreshPeriod = GetDuration(Config.GetLimitRefreshPeriod(), LIMIT_REFRESH_PERIOD); UsageRefreshPeriod = GetDuration(Config.GetUsageRefreshPeriod(), USAGE_REFRESH_PERIOD); } static constexpr char ActorName[] = "FQ_QUOTA_SERVICE"; void Bootstrap() { + YdbConnection = NewYdbConnection(StorageConfig, CredProviderFactory, YqSharedResources->CoreYdbDriver); + DbPool = YqSharedResources->DbPoolHolder->GetOrCreate(EDbPoolId::MAIN, 10, YdbConnection->TablePathPrefix); + Send(GetNameserviceActorId(), new NActors::TEvInterconnect::TEvListNodes()); Become(&TQuotaManagementService::StateFunc); LOG_I("STARTED"); } @@ -73,59 +160,90 @@ private: hFunc(TEvQuotaService::TQuotaChangeNotification, Handle) hFunc(TEvQuotaService::TQuotaUsageResponse, Handle) hFunc(TEvQuotaService::TQuotaSetRequest, Handle) + hFunc(TEvents::TEvCallback, [](TEvents::TEvCallback::TPtr& ev) { ev->Get()->Callback(); } ); + hFunc(NActors::TEvInterconnect::TEvNodesInfo, Handle) + hFunc(TEvQuotaService::TEvQuotaUpdateNotification, Handle) + hFunc(NActors::TEvents::TEvUndelivered, Handle) ); + void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { + LOG_I("UNDELIVERED to Peer " << ev->Sender.NodeId() << ", " << ev->Get()->Reason); + } + + void Handle(NActors::TEvInterconnect::TEvNodesInfo::TPtr& ev) { + auto oldPeerCount = NodeIds.size(); + NodeIds.clear(); + auto selfNodeId = SelfId().NodeId(); + for (auto node : ev->Get()->Nodes) { + if (node.NodeId != selfNodeId) { + NodeIds.push_back(node.NodeId); + } + } + *ServiceCounters->GetCounter("PeerCount") = NodeIds.size(); + if (oldPeerCount != NodeIds.size()) { + LOG_D("IC Peers[" << NodeIds.size() << "]: " << ToString(NodeIds)); + } + TActivationContext::Schedule(TDuration::Seconds(NodeIds.empty() ? 1 : 5), new IEventHandle(GetNameserviceActorId(), SelfId(), new NActors::TEvInterconnect::TEvListNodes())); + } + void Handle(TEvQuotaService::TQuotaGetRequest::TPtr& ev) { auto subjectType = ev->Get()->SubjectType; auto subjectId = ev->Get()->SubjectId; auto& subjectMap = QuotaCacheMap[subjectType]; auto& infoMap = QuotaInfoMap[subjectType]; - if (subjectId.empty()) { // Just get defaults + if (subjectId.empty()) { // Just reply with defaults auto response = MakeHolder<TEvQuotaService::TQuotaGetResponse>(); response->SubjectType = subjectType; for (auto& it : infoMap) { response->Quotas.emplace(it.first, TQuotaUsage(it.second.DefaultLimit)); } + LOG_T(subjectType << ".<defaults>: " << ToString(response->Quotas)); Send(ev->Sender, response.Release()); return; } auto it = subjectMap.find(subjectId); - bool pended = false; - // Load into cache, if needed if (it == subjectMap.end()) { - TQuotaCache cache; - // 1. Load from Config - for (const auto& quota : Config.GetQuotas()) { - if (quota.GetSubjectType() == subjectType && quota.GetSubjectId() == subjectId) { - for (const auto& limit : quota.GetLimit()) { - cache.UsageMap.emplace(limit.GetName(), TQuotaCachedUsage(limit.GetLimit())); - } - } + LOG_D(subjectType << "." << subjectId << " NOT CASHED, Loading ..."); + } else { + if (it->second.LoadedAt + LimitRefreshPeriod < Now()) { + LOG_D(subjectType << "." << subjectId << " FORCE CASHE RELOAD, Loading ..."); + it = subjectMap.end(); } - // 2. Append defaults - for (auto& it : infoMap) { - if (cache.UsageMap.find(it.first) == cache.UsageMap.end()) { - cache.UsageMap.emplace(it.first, TQuotaCachedUsage(it.second.DefaultLimit)); + } + + if (it == subjectMap.end()) { + ReadQuota(subjectType, subjectId, + [this, ev=ev](TReadQuotaExecuter& executer) { + // This block is executed in correct self-context, no locks/syncs required + auto& subjectMap = this->QuotaCacheMap[executer.State.SubjectType]; + auto& cache = subjectMap[executer.State.SubjectId]; + LOG_D(executer.State.SubjectType << "." << executer.State.SubjectId << ToString(cache.UsageMap) << " LOADED"); + CheckUsageMaybeReply(executer.State.SubjectType, executer.State.SubjectId, cache, ev); } - } - // 3. Load from DB - subjectMap.emplace(subjectId, cache); + ); + } else { + CheckUsageMaybeReply(subjectType, subjectId, it->second, ev); } + } - auto& cache = subjectMap[subjectId]; + void CheckUsageMaybeReply(const TString& subjectType, const TString& subjectId, TQuotaCache& cache, const TEvQuotaService::TQuotaGetRequest::TPtr& ev) { + + bool pended = false; + auto& infoMap = QuotaInfoMap[subjectType]; if (!ev->Get()->AllowStaleUsage) { // Refresh usage for (auto& itUsage : cache.UsageMap) { auto metricName = itUsage.first; auto& cachedUsage = itUsage.second; - if (cachedUsage.Usage.UpdatedAt + UsageRefreshPeriod < Now()) { + if (!cachedUsage.Usage.Usage || cachedUsage.Usage.Usage->UpdatedAt + UsageRefreshPeriod < Now()) { auto it = infoMap.find(metricName); if (it != infoMap.end()) { if (it->second.UsageUpdater != NActors::TActorId{}) { if (!cache.PendingUsage.contains(metricName)) { + LOG_T(subjectType << "." << subjectId << "." << metricName << " IS STALE, Refreshing ..."); Send(it->second.UsageUpdater, new TEvQuotaService::TQuotaUsageRequest(subjectType, subjectId, metricName)); cache.PendingUsage.insert(metricName); cachedUsage.RequestedAt = Now(); @@ -141,17 +259,165 @@ private: } if (!pended) { - auto response = MakeHolder<TEvQuotaService::TQuotaGetResponse>(); - response->SubjectType = subjectType; - response->SubjectId = subjectId; - for (auto it : cache.UsageMap) { - response->Quotas.emplace(it.first, it.second.Usage); - } - Send(ev->Sender, response.Release()); + SendQuota(ev->Sender, ev->Cookie, subjectType, subjectId, cache); cache.PendingRequests.erase(ev->Sender); } } + void ChangeLimitsAndReply(const TString& subjectType, const TString& subjectId, TQuotaCache& cache, const TEvQuotaService::TQuotaSetRequest::TPtr& ev) { + + auto& infoMap = QuotaInfoMap[subjectType]; + for (auto metricLimit : ev->Get()->Limits) { + auto& metricName = metricLimit.first; + + auto it = cache.UsageMap.find(metricName); + if (it != cache.UsageMap.end()) { + auto& cached = it->second; + auto limit = metricLimit.second; + if (cached.Usage.Limit.Value == 0 || limit == 0 || limit > cached.Usage.Limit.Value) { + // check hard limit only if quota is increased + auto itI = infoMap.find(metricName); + if (itI != infoMap.end()) { + auto& info = itI->second; + if (info.HardLimit != 0 && (limit == 0 || limit > info.HardLimit)) { + limit = info.HardLimit; + } + } + } + if (cached.Usage.Limit.Value != limit) { + cached.Usage.Limit.Value = limit; + cached.Usage.Limit.UpdatedAt = Now(); + LOG_T(cached.Usage.ToString(subjectType, subjectId, metricName) << " LIMIT Changed"); + SyncQuota(subjectType, subjectId, metricName, cached); + } + } + } + + auto response = MakeHolder<TEvQuotaService::TQuotaSetResponse>(subjectType, subjectId); + for (auto it : cache.UsageMap) { + response->Limits.emplace(it.first, it.second.Usage.Limit.Value); + } + Send(ev->Sender, response.Release()); + } + + + void ReadQuota(const TString& subjectType, const TString& subjectId, TReadQuotaExecuter::TCallback callback) { + + TDbExecutable::TPtr executable; + auto& executer = TReadQuotaExecuter::Create(executable, false, nullptr); + + executer.State.SubjectType = subjectType; + executer.State.SubjectId = subjectId; + + executer.Read( + [](TReadQuotaExecuter& executer, TSqlQueryBuilder& builder) { + builder.AddText( + "SELECT `" METRIC_NAME_COLUMN_NAME "`, `" METRIC_LIMIT_COLUMN_NAME "`, `" LIMIT_UPDATED_AT_COLUMN_NAME "`, `" METRIC_USAGE_COLUMN_NAME "`, `" USAGE_UPDATED_AT_COLUMN_NAME "`\n" + "FROM `" QUOTAS_TABLE_NAME "`\n" + "WHERE `" SUBJECT_TYPE_COLUMN_NAME "` = $subject\n" + " AND `" SUBJECT_ID_COLUMN_NAME "` = $id\n" + ); + builder.AddString("subject", executer.State.SubjectType); + builder.AddString("id", executer.State.SubjectId); + }, + [](TReadQuotaExecuter& executer, const TVector<NYdb::TResultSet>& resultSets) { + TResultSetParser parser(resultSets.front()); + while (parser.TryNextRow()) { + auto name = *parser.ColumnParser(METRIC_NAME_COLUMN_NAME).GetOptionalString(); + auto& quotaUsage = executer.State.UsageMap[name]; + quotaUsage.Limit.Value = *parser.ColumnParser(METRIC_LIMIT_COLUMN_NAME).GetOptionalUint64(); + quotaUsage.Limit.UpdatedAt = *parser.ColumnParser(LIMIT_UPDATED_AT_COLUMN_NAME).GetOptionalTimestamp(); + auto usage = parser.ColumnParser(METRIC_USAGE_COLUMN_NAME).GetOptionalUint64(); + if (usage) { + quotaUsage.Usage.ConstructInPlace(); + quotaUsage.Usage->Value = *usage; + quotaUsage.Usage->UpdatedAt = *parser.ColumnParser(USAGE_UPDATED_AT_COLUMN_NAME).GetOptionalTimestamp(); + } + } + }, + "ReadQuotas" + ).Process(SelfId(), + [this, callback=callback](TReadQuotaExecuter& executer) { + auto& subjectMap = this->QuotaCacheMap[executer.State.SubjectType]; + auto& cache = subjectMap[executer.State.SubjectId]; + + LOG_T(executer.State.SubjectType << "." << executer.State.SubjectId << " " << ToString(executer.State.UsageMap) << " FROM DB"); + + // 1. Fill from DB + for (auto& itUsage : executer.State.UsageMap) { + cache.UsageMap[itUsage.first].Usage = itUsage.second; + } + + // 2. Append from Config + for (const auto& quota : this->Config.GetQuotas()) { + if (quota.GetSubjectType() == executer.State.SubjectType && quota.GetSubjectId() == executer.State.SubjectId) { + for (const auto& limit : quota.GetLimit()) { + if (cache.UsageMap.find(limit.GetName()) == cache.UsageMap.end()) { + cache.UsageMap.emplace(limit.GetName(), TQuotaCachedUsage(limit.GetLimit())); + } + } + } + } + + // 3. Append defaults + auto& infoMap = this->QuotaInfoMap[executer.State.SubjectType]; + for (auto& it : infoMap) { + if (cache.UsageMap.find(it.first) == cache.UsageMap.end()) { + cache.UsageMap.emplace(it.first, TQuotaCachedUsage(it.second.DefaultLimit)); + } + } + + cache.LoadedAt = Now(); + + if (callback) { + callback(executer); + } + } + ); + + Exec(DbPool, executable); + } + + void SendQuota(NActors::TActorId receivedId, ui64 cookie, const TString& subjectType, const TString& subjectId, TQuotaCache& cache) { + auto response = MakeHolder<TEvQuotaService::TQuotaGetResponse>(); + response->SubjectType = subjectType; + response->SubjectId = subjectId; + for (auto it : cache.UsageMap) { + response->Quotas.emplace(it.first, it.second.Usage); + } + LOG_T(subjectType << "." << subjectId << ToString(response->Quotas) << " SEND QUOTAS"); + Send(receivedId, response.Release(), 0, cookie); + } + + void Handle(TEvQuotaService::TEvQuotaUpdateNotification::TPtr& ev) { + auto& record = ev->Get()->Record; + TQuotaUsage usage(record.metric_limit(), NProtoInterop::CastFromProto(record.limit_updated_at())); + if (record.has_usage_updated_at()) { + usage.Usage.ConstructInPlace(); + usage.Usage->Value = record.metric_usage(); + usage.Usage->UpdatedAt = NProtoInterop::CastFromProto(record.usage_updated_at()); + } + LOG_T(usage.ToString(record.subject_type(), record.subject_id(), record.metric_name()) << " UPDATE from Peer " << ev->Sender.NodeId()); + UpdateQuota(record.subject_type(), record.subject_id(), record.metric_name(), usage); + } + + void NotifyClusterNodes(const TString& subjectType, const TString& subjectId, const TString& metricName, TQuotaUsage& usage) { + LOG_T(usage.ToString(subjectType, subjectId, metricName) << " NOTIFY CHANGE"); + for (auto nodeId : NodeIds) { + Fq::Quota::EvQuotaUpdateNotification notification; + notification.set_subject_type(subjectType); + notification.set_subject_id(subjectId); + notification.set_metric_name(metricName); + notification.set_metric_limit(usage.Limit.Value); + *notification.mutable_limit_updated_at() = NProtoInterop::CastToProto(usage.Limit.UpdatedAt); + if (usage.Usage) { + notification.set_metric_usage(usage.Usage->Value); + *notification.mutable_usage_updated_at() = NProtoInterop::CastToProto(usage.Usage->UpdatedAt); + } + Send(MakeQuotaServiceActorId(nodeId), new TEvQuotaService::TEvQuotaUpdateNotification(notification), IEventHandle::FlagTrackDelivery); + } + } + void Handle(TEvQuotaService::TQuotaChangeNotification::TPtr& ev) { auto& request = *ev->Get(); auto itt = QuotaInfoMap.find(request.SubjectType); @@ -161,12 +427,129 @@ private: if (itm != metricMap.end()) { auto& info = itm->second; if (info.UsageUpdater != NActors::TActorId{}) { + LOG_T(request.SubjectType << "." << request.SubjectId << "." << request.MetricName << " FORCE UPDATE, Updating ..."); Send(info.UsageUpdater, new TEvQuotaService::TQuotaUsageRequest(request.SubjectType, request.SubjectId, request.MetricName)); } } } } + void SyncQuota(const TString& subjectType, const TString& subjectId, const TString& metricName, TQuotaCachedUsage& cached) { + + if (cached.SyncInProgress) { + cached.ChangedAfterSync = true; + return; + } + + TDbExecutable::TPtr executable; + auto& executer = TSyncQuotaExecuter::Create(executable, false, nullptr); + + executer.State.SubjectType = subjectType; + executer.State.SubjectId = subjectId; + executer.State.MetricName = metricName; + executer.State.Usage = cached.Usage; + + cached.ChangedAfterSync = false; + cached.SyncInProgress = true; + + executer.Read( + [](TSyncQuotaExecuter& executer, TSqlQueryBuilder& builder) { + builder.AddText( + "SELECT `" METRIC_LIMIT_COLUMN_NAME "`, `" LIMIT_UPDATED_AT_COLUMN_NAME"`, `" METRIC_USAGE_COLUMN_NAME "`, `" USAGE_UPDATED_AT_COLUMN_NAME "`\n" + "FROM `" QUOTAS_TABLE_NAME "`\n" + "WHERE `" SUBJECT_TYPE_COLUMN_NAME "` = $subject\n" + " AND `" SUBJECT_ID_COLUMN_NAME "` = $id\n" + " AND `" METRIC_NAME_COLUMN_NAME "` = $metric\n" + ); + builder.AddString("subject", executer.State.SubjectType); + builder.AddString("id", executer.State.SubjectId); + builder.AddString("metric", executer.State.MetricName); + }, + [](TSyncQuotaExecuter& executer, const TVector<NYdb::TResultSet>& resultSets) { + TResultSetParser parser(resultSets.front()); + if (parser.TryNextRow()) { + auto limitUpdatedAt = parser.ColumnParser(LIMIT_UPDATED_AT_COLUMN_NAME).GetOptionalTimestamp(); + if (limitUpdatedAt && *limitUpdatedAt > executer.State.Usage.Limit.UpdatedAt) { + // DB changed since last read, use it and ignore local changes + executer.State.Usage.Limit.Value = *parser.ColumnParser(METRIC_LIMIT_COLUMN_NAME).GetOptionalUint64(); + executer.State.Usage.Limit.UpdatedAt = *limitUpdatedAt; + executer.State.Refreshed = true; + } + auto usageUpdatedAt = parser.ColumnParser(USAGE_UPDATED_AT_COLUMN_NAME).GetOptionalTimestamp(); + if (usageUpdatedAt && (!executer.State.Usage.Usage || *usageUpdatedAt > executer.State.Usage.Usage->UpdatedAt)) { + if (!executer.State.Usage.Usage) { + executer.State.Usage.Usage.ConstructInPlace(); + } + executer.State.Usage.Usage->Value = *parser.ColumnParser(METRIC_USAGE_COLUMN_NAME).GetOptionalUint64(); + executer.State.Usage.Usage->UpdatedAt = *usageUpdatedAt; + } + } + if (!executer.State.Refreshed) { + executer.Write( + [](TSyncQuotaExecuter& executer, TSqlQueryBuilder& builder) { + builder.AddText( + "UPSERT INTO `" QUOTAS_TABLE_NAME "` (`" SUBJECT_TYPE_COLUMN_NAME "`, `" SUBJECT_ID_COLUMN_NAME "`, `" METRIC_NAME_COLUMN_NAME "`, `" METRIC_LIMIT_COLUMN_NAME "`, `" LIMIT_UPDATED_AT_COLUMN_NAME "`, `" METRIC_USAGE_COLUMN_NAME "`, `" USAGE_UPDATED_AT_COLUMN_NAME "`)\n" + ); + builder.AddText( + executer.State.Usage.Usage ? + "VALUES ($subject, $id, $metric, $limit, $limit_updated_at, $usage, $usage_updated_at);\n" + : "VALUES ($subject, $id, $metric, $limit, $limit_updated_at, NULL, NULL);\n" + ); + builder.AddString("subject", executer.State.SubjectType); + builder.AddString("id", executer.State.SubjectId); + builder.AddString("metric", executer.State.MetricName); + builder.AddUint64("limit", executer.State.Usage.Limit.Value); + builder.AddTimestamp("limit_updated_at", executer.State.Usage.Limit.UpdatedAt); + if (executer.State.Usage.Usage) { + builder.AddUint64("usage", executer.State.Usage.Usage->Value); + builder.AddTimestamp("usage_updated_at", executer.State.Usage.Usage->UpdatedAt); + } + }, + "WriteQuota" + ); + } + }, + "CheckQuota" + ).Process(SelfId(), + [this](TSyncQuotaExecuter& executer) { + if (executer.State.Refreshed) { + UpdateQuota(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName, executer.State.Usage); + } else { + this->NotifyClusterNodes(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName, executer.State.Usage); + } + + auto& subjectMap = this->QuotaCacheMap[executer.State.SubjectType]; + auto it = subjectMap.find(executer.State.SubjectId); + if (it != subjectMap.end()) { + auto& cache = it->second; + auto itQ = cache.UsageMap.find(executer.State.MetricName); + if (itQ != cache.UsageMap.end()) { + itQ->second.SyncInProgress = false; + if (itQ->second.ChangedAfterSync) { + LOG_T(itQ->second.Usage.ToString(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName) << " RESYNC"); + SyncQuota(executer.State.SubjectType, executer.State.SubjectId, executer.State.MetricName, itQ->second); + } + } + } + } + ); + + Exec(DbPool, executable); + } + + void UpdateQuota(const TString& subjectType, const TString& subjectId, const TString& metricName, TQuotaUsage& usage) { + auto& subjectMap = QuotaCacheMap[subjectType]; + auto it = subjectMap.find(subjectId); + if (it != subjectMap.end()) { + auto& cache = it->second; + auto itQ = cache.UsageMap.find(metricName); + if (itQ != cache.UsageMap.end()) { + itQ->second.Usage.Merge(usage); + LOG_T(itQ->second.Usage.ToString(subjectType, subjectId, metricName) << " MERGED"); + } + } + } + void Handle(TEvQuotaService::TQuotaUsageResponse::TPtr& ev) { auto subjectType = ev->Get()->SubjectType; auto subjectId = ev->Get()->SubjectId; @@ -175,6 +558,7 @@ private: auto it = subjectMap.find(subjectId); if (it == subjectMap.end()) { + // if quotas are not cached - ignore usage update return; } @@ -182,96 +566,66 @@ private: cache.PendingUsage.erase(metricName); auto itQ = cache.UsageMap.find(metricName); - if (itQ != cache.UsageMap.end()) { - itQ->second.Usage.Usage = ev->Get()->Usage; + if (itQ == cache.UsageMap.end()) { + // if metric is not defined - ignore usage update + return; } + itQ->second.Usage.Usage = ev->Get()->Usage; + LOG_T(itQ->second.Usage.ToString(subjectType, subjectId, metricName) << " REFRESHED"); if (cache.PendingUsage.size() == 0) { for (auto& itR : cache.PendingRequests) { - auto response = MakeHolder<TEvQuotaService::TQuotaGetResponse>(); - response->SubjectType = subjectType; - response->SubjectId = subjectId; - for (auto it : cache.UsageMap) { - response->Quotas.emplace(it.first, it.second.Usage); - } - Send(itR.first, response.Release()); + SendQuota(itR.first, itR.second, subjectType, subjectId, cache); } cache.PendingRequests.clear(); } + + SyncQuota(subjectType, subjectId, metricName, itQ->second); } void Handle(TEvQuotaService::TQuotaSetRequest::TPtr& ev) { auto subjectType = ev->Get()->SubjectType; auto subjectId = ev->Get()->SubjectId; auto& subjectMap = QuotaCacheMap[subjectType]; - auto& infoMap = QuotaInfoMap[subjectType]; auto it = subjectMap.find(subjectId); - - // Load into cache, if needed if (it == subjectMap.end()) { - TQuotaCache cache; - // 1. Load from Config - for (const auto& quota : Config.GetQuotas()) { - if (quota.GetSubjectType() == subjectType && quota.GetSubjectId() == subjectId) { - for (const auto& limit : quota.GetLimit()) { - cache.UsageMap.emplace(limit.GetName(), TQuotaCachedUsage(limit.GetLimit())); - } - } - } - // 2. Load from DB (TBD) - // 3. Append defaults - for (auto& it : infoMap) { - if (cache.UsageMap.find(it.first) == cache.UsageMap.end()) { - cache.UsageMap.emplace(it.first, TQuotaCachedUsage(it.second.DefaultLimit)); - } - } - bool _; - std::tie(it, _) = subjectMap.emplace(subjectId, cache); - } - - auto& cache = it->second; - - for (auto metricLimit : ev->Get()->Limits) { - auto& name = metricLimit.first; - auto it = cache.UsageMap.find(name); - if (it != cache.UsageMap.end()) { - auto& cached = it->second; - auto limit = metricLimit.second; - if (cached.Usage.Limit == 0 || limit == 0 || limit > cached.Usage.Limit) { - // check hard limit only if quota is increased - auto itI = infoMap.find(name); - if (itI != infoMap.end()) { - auto& info = itI->second; - if (info.HardLimit != 0 && (limit == 0 || limit > info.HardLimit)) { - limit = info.HardLimit; - } - } + ReadQuota(subjectType, subjectId, + [this, ev=ev](TReadQuotaExecuter& executer) { + // This block is executed in correct self-context, no locks/syncs required + auto& subjectMap = this->QuotaCacheMap[executer.State.SubjectType]; + auto& cache = subjectMap[executer.State.SubjectId]; + LOG_D(executer.State.SubjectType << "." << executer.State.SubjectId << ToString(cache.UsageMap) << " LOADED"); + ChangeLimitsAndReply(executer.State.SubjectType, executer.State.SubjectId, cache, ev); } - cached.Usage.Limit = limit; - } + ); + } else { + ChangeLimitsAndReply(subjectType, subjectId, it->second, ev); } - - auto response = MakeHolder<TEvQuotaService::TQuotaSetResponse>(subjectType, subjectId); - for (auto it : cache.UsageMap) { - response->Limits.emplace(it.first, it.second.Usage.Limit); - } - Send(ev->Sender, response.Release()); } NConfig::TQuotasManagerConfig Config; + NConfig::TYdbStorageConfig StorageConfig; + ::NYq::TYqSharedResources::TPtr YqSharedResources; + NKikimr::TYdbCredentialsProviderFactory CredProviderFactory; + TYdbConnectionPtr YdbConnection; + TDbPool::TPtr DbPool; const ::NMonitoring::TDynamicCounterPtr ServiceCounters; THashMap<TString /* SubjectType */, THashMap<TString /* MetricName */, TQuotaInfo>> QuotaInfoMap; THashMap<TString /* SubjectType */, THashMap<TString /* SubjectId */, TQuotaCache>> QuotaCacheMap; + TDuration LimitRefreshPeriod; TDuration UsageRefreshPeriod; + std::vector<ui32> NodeIds; }; NActors::IActor* CreateQuotaServiceActor( const NConfig::TQuotasManagerConfig& config, - /* const NYq::TYqSharedResources::TPtr& yqSharedResources, */ + const NConfig::TYdbStorageConfig& storageConfig, + const TYqSharedResources::TPtr& yqSharedResources, + NKikimr::TYdbCredentialsProviderFactory credProviderFactory, const ::NMonitoring::TDynamicCounterPtr& counters, std::vector<TQuotaDescription> quotaDesc) { - return new TQuotaManagementService(config, /* yqSharedResources, */ counters, quotaDesc); + return new TQuotaManagementService(config, storageConfig, yqSharedResources, credProviderFactory, counters, quotaDesc); } } /* NYq */ diff --git a/ydb/core/yq/libs/quota_manager/quota_manager.h b/ydb/core/yq/libs/quota_manager/quota_manager.h index b596123ece..9338067411 100644 --- a/ydb/core/yq/libs/quota_manager/quota_manager.h +++ b/ydb/core/yq/libs/quota_manager/quota_manager.h @@ -32,11 +32,13 @@ namespace NYq { -NActors::TActorId MakeQuotaServiceActorId(); +NActors::TActorId MakeQuotaServiceActorId(ui32 nodeId); NActors::IActor* CreateQuotaServiceActor( const NConfig::TQuotasManagerConfig& config, - /* const NYq::TYqSharedResources::TPtr& yqSharedResources, */ + const NConfig::TYdbStorageConfig& storageConfig, + const TYqSharedResources::TPtr& yqSharedResources, + NKikimr::TYdbCredentialsProviderFactory credProviderFactory, const ::NMonitoring::TDynamicCounterPtr& counters, std::vector<TQuotaDescription> quotaDesc); diff --git a/ydb/core/yq/libs/quota_manager/quota_proxy.cpp b/ydb/core/yq/libs/quota_manager/quota_proxy.cpp index 7e7394b80e..e6cb9f205b 100644 --- a/ydb/core/yq/libs/quota_manager/quota_proxy.cpp +++ b/ydb/core/yq/libs/quota_manager/quota_proxy.cpp @@ -12,13 +12,15 @@ #include <ydb/core/protos/services.pb.h> #define LOG_E(stream) \ - LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream) + LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_PROXY, stream) #define LOG_W(stream) \ - LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream) + LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_PROXY, stream) #define LOG_I(stream) \ - LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream) + LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_PROXY, stream) #define LOG_D(stream) \ - LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_SERVICE, stream) + LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_PROXY, stream) +#define LOG_T(stream) \ + LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::FQ_QUOTA_PROXY, stream) namespace NYq { @@ -36,7 +38,7 @@ public: void Bootstrap() { Become(&TQuotaProxyGetRequestActor::StateFunc); - Send(NYq::MakeQuotaServiceActorId(), new TEvQuotaService::TQuotaGetRequest(Ev->Get()->SubjectType, Ev->Get()->SubjectId)); + Send(NYq::MakeQuotaServiceActorId(SelfId().NodeId()), new TEvQuotaService::TQuotaGetRequest(Ev->Get()->SubjectType, Ev->Get()->SubjectId)); } STRICT_STFUNC(StateFunc, @@ -58,7 +60,7 @@ public: void Bootstrap() { Become(&TQuotaProxySetRequestActor::StateFunc); - Send(NYq::MakeQuotaServiceActorId(), new TEvQuotaService::TQuotaSetRequest(Ev->Get()->SubjectType, Ev->Get()->SubjectId, Ev->Get()->Limits)); + Send(NYq::MakeQuotaServiceActorId(SelfId().NodeId()), new TEvQuotaService::TQuotaSetRequest(Ev->Get()->SubjectType, Ev->Get()->SubjectId, Ev->Get()->Limits)); } STRICT_STFUNC(StateFunc, diff --git a/ydb/core/yq/libs/shared_resources/CMakeLists.txt b/ydb/core/yq/libs/shared_resources/CMakeLists.txt index 2aade5a890..ff3008d5ab 100644 --- a/ydb/core/yq/libs/shared_resources/CMakeLists.txt +++ b/ydb/core/yq/libs/shared_resources/CMakeLists.txt @@ -19,6 +19,7 @@ target_link_libraries(yq-libs-shared_resources PUBLIC ydb-core-protos libs-control_plane_storage-proto yq-libs-events + libs-quota_manager-events libs-shared_resources-interface ydb-library-logger ydb-library-security diff --git a/ydb/core/yq/libs/shared_resources/db_exec.h b/ydb/core/yq/libs/shared_resources/db_exec.h index a2b4343544..bf3a2287e7 100644 --- a/ydb/core/yq/libs/shared_resources/db_exec.h +++ b/ydb/core/yq/libs/shared_resources/db_exec.h @@ -71,10 +71,16 @@ inline TAsyncStatus Exec(TDbPool::TPtr dbPool, TDbExecutable::TPtr executable) { template <typename TState> class TDbExecuter : public TDbExecutable { +public: + using TCallback = std::function<void(TDbExecuter<TState>&)>; + using TBuildCallback = std::function<void(TDbExecuter<TState>&, TSqlQueryBuilder&)>; + using TResultCallback = std::function<void(TDbExecuter<TState>&, const TVector<NYdb::TResultSet>&)>; + +private: struct TExecStep { - std::function<void(TDbExecuter<TState>&, TSqlQueryBuilder&)> BuilderCallback; - std::function<void(TDbExecuter<TState>&, const TVector<NYdb::TResultSet>&)> HandlerCallback; - std::function<void(TDbExecuter<TState>&)> ProcessCallback; + TBuildCallback BuildCallback; + TResultCallback ResultCallback; + TCallback ProcessCallback; TString Name; bool Commit = false; }; @@ -84,15 +90,20 @@ class TDbExecuter : public TDbExecutable { NActors::TActorId HandlerActorId; TMaybe<TTransaction> Transaction; NActors::TActorSystem* ActorSystem = nullptr; - std::function<void(TDbExecuter<TState>&)> HandlerCallback; - std::function<void(TDbExecuter<TState>&)> StateInitCallback; + TCallback HandlerCallback; + TCallback StateInitCallback; bool skipStep = false; protected: - TDbExecuter(bool collectDebugInfo, std::function<void(TDbExecuter<TState>&)> stateInitCallback = nullptr) + TDbExecuter(bool collectDebugInfo, std::function<void(TDbExecuter<TState>&)> stateInitCallback) : TDbExecutable(collectDebugInfo), StateInitCallback(stateInitCallback) { } + TDbExecuter(bool collectDebugInfo) + : TDbExecutable(collectDebugInfo) { + StateInitCallback = [](TDbExecuter<TState>& executer) { executer.State = TState{}; }; + } + TDbExecuter(const TDbExecuter& other) = delete; public: @@ -105,6 +116,12 @@ public: return *executer; }; + static TDbExecuter& Create(TDbExecutable::TPtr& holder, bool collectDebugInfo, std::function<void(TDbExecuter<TState>&)> stateInitCallback) { + auto executer = new TDbExecuter(collectDebugInfo, stateInitCallback); + holder.reset(executer); + return *executer; + }; + void SkipStep() { skipStep = true; } @@ -121,7 +138,11 @@ public: TCommitTransactionResult result = future.GetValue(); auto status = static_cast<TStatus>(result); - return this->NextStep(session); + if (!status.IsSuccess()) { + return MakeFuture(status); + } else { + return this->NextStep(session); + } }); } if (HandlerActorId != NActors::TActorId{}) { @@ -136,7 +157,7 @@ public: } else { TSqlQueryBuilder builder(DbPool->TablePathPrefix, Steps[CurrentStepIndex].Name); skipStep = false; - Steps[CurrentStepIndex].BuilderCallback(*this, builder); + Steps[CurrentStepIndex].BuildCallback(*this, builder); if (skipStep) { // TODO Refactor this this->CurrentStepIndex++; @@ -170,9 +191,9 @@ public: this->Transaction = result.GetTransaction(); } - if (this->Steps[CurrentStepIndex].HandlerCallback) { + if (this->Steps[CurrentStepIndex].ResultCallback) { try { - this->Steps[CurrentStepIndex].HandlerCallback(*this, result.GetResultSets()); + this->Steps[CurrentStepIndex].ResultCallback(*this, result.GetResultSets()); } catch (const TControlPlaneStorageException& exception) { NYql::TIssue issue = MakeErrorIssue(exception.Code, exception.GetRawMessage()); Issues.AddIssue(issue); @@ -200,34 +221,32 @@ public: TAsyncStatus Execute(NYdb::NTable::TSession& session) override { if (StateInitCallback) { StateInitCallback(*this); - } else { - State = TState{}; } return NextStep(session); } TDbExecuter& Read( - std::function<void(TDbExecuter<TState>&, TSqlQueryBuilder&)> builderCallback - , std::function<void(TDbExecuter<TState>&, const TVector<NYdb::TResultSet>&)> handlerCallback + TBuildCallback buildCallback + , TResultCallback resultCallback , const TString& Name = "DefaultReadName" , bool commit = false ) { - Steps.emplace(Steps.begin() + InsertStepIndex, TExecStep{builderCallback, handlerCallback, nullptr, Name, commit}); + Steps.emplace(Steps.begin() + InsertStepIndex, TExecStep{buildCallback, resultCallback, nullptr, Name, commit}); InsertStepIndex++; return *this; } TDbExecuter& Write( - std::function<void(TDbExecuter<TState>&, TSqlQueryBuilder&)> builderCallback + TBuildCallback buildCallback , const TString& Name = "DefaultWriteName" , bool commit = false ) { - return Read(builderCallback, nullptr, Name, commit); + return Read(buildCallback, nullptr, Name, commit); } void Process( NActors::TActorId actorId - , std::function<void(TDbExecuter<TState>&)> handlerCallback + , TCallback handlerCallback ) { Y_VERIFY(HandlerActorId == NActors::TActorId{}, "Handler must be empty"); ActorSystem = TActivationContext::ActorSystem(); |