diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2022-09-05 12:10:25 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2022-09-05 12:10:25 +0300 |
commit | 7cf7de710c1f50c20a229d12b33d18b8f4f0f736 (patch) | |
tree | 0355b8befbe655bfe6a762b907699d82aca5fc85 | |
parent | d3935667210c5e85135757ceeed9663bc6f3d914 (diff) | |
download | ydb-7cf7de710c1f50c20a229d12b33d18b8f4f0f736.tar.gz |
Handle quota update in rate limiter
11 files changed, 346 insertions, 28 deletions
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/rate_limiter_resources.cpp b/ydb/core/yq/libs/control_plane_storage/internal/rate_limiter_resources.cpp index 128bc20f96..750d349db3 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/rate_limiter_resources.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/rate_limiter_resources.cpp @@ -188,7 +188,7 @@ public: void Handle(TEvQuotaService::TQuotaGetResponse::TPtr& ev) { CPS_LOG_D("Got response from quota service"); if (auto quotaIt = ev->Get()->Quotas.find(QUOTA_CPU_PERCENT_LIMIT); quotaIt != ev->Get()->Quotas.end()) { - CloudLimit = static_cast<double>(quotaIt->second.Limit.Value * 100 * 1000); + CloudLimit = static_cast<double>(quotaIt->second.Limit.Value * 10); // percent -> milliseconds Send(RateLimiterControlPlaneServiceId(), new TEvRateLimiter::TEvCreateResource(CloudId, FolderId, QueryId, CloudLimit, QueryLimit)); } else { ReplyWithError("CPU quota for cloud was not found"); diff --git a/ydb/core/yq/libs/events/event_ids.h b/ydb/core/yq/libs/events/event_ids.h index c6e7060372..7a6755db25 100644 --- a/ydb/core/yq/libs/events/event_ids.h +++ b/ydb/core/yq/libs/events/event_ids.h @@ -52,6 +52,7 @@ struct TEventIds { EvDeleteRateLimiterResourceResponse, EvSchemaDeleted, + EvSchemaUpdated, // Special events EvEnd diff --git a/ydb/core/yq/libs/events/events.h b/ydb/core/yq/libs/events/events.h index 11c9b453f2..e08b8efd45 100644 --- a/ydb/core/yq/libs/events/events.h +++ b/ydb/core/yq/libs/events/events.h @@ -264,6 +264,15 @@ struct TEvents { NYdb::TStatus Result; }; + struct TEvSchemaUpdated : public NActors::TEventLocal<TEvSchemaUpdated, TEventIds::EvSchemaUpdated> { + explicit TEvSchemaUpdated(NYdb::TStatus result) + : Result(std::move(result)) + { + } + + NYdb::TStatus Result; + }; + struct TEvCallback : public NActors::TEventLocal<TEvCallback, TEventIds::EvCallback> { explicit TEvCallback(std::function<void()> callback) : Callback(callback) diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index 7f363212d7..c1d8ff91da 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -304,7 +304,7 @@ void Init( { TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_ANALYTICS_COUNT_LIMIT, 100, 1000, NYq::ControlPlaneStorageServiceActorId()), TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_STREAMING_COUNT_LIMIT, 100, 1000, NYq::ControlPlaneStorageServiceActorId()), - TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_CPU_PERCENT_LIMIT, 200, 3200), + TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_CPU_PERCENT_LIMIT, 200, 3200, protoConfig.GetRateLimiter().GetControlPlaneEnabled() ? NYq::RateLimiterControlPlaneServiceId() : NActors::TActorId()), TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_MEMORY_LIMIT, 0), TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_RESULT_LIMIT, 0), TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_ANALYTICS_DURATION_LIMIT, 1440), diff --git a/ydb/core/yq/libs/quota_manager/quota_manager.cpp b/ydb/core/yq/libs/quota_manager/quota_manager.cpp index 948f667b34..ba18ae9256 100644 --- a/ydb/core/yq/libs/quota_manager/quota_manager.cpp +++ b/ydb/core/yq/libs/quota_manager/quota_manager.cpp @@ -294,6 +294,13 @@ private: auto itI = infoMap.find(metricName); if (itI != infoMap.end()) { auto& info = itI->second; + if (cached.Usage.Limit.Value == 0 || limit == 0 || limit > cached.Usage.Limit.Value) { + // check hard limit only if quota is increased + if (info.HardLimit != 0 && (limit == 0 || limit > info.HardLimit)) { + limit = info.HardLimit; + } + } + if (info.QuotaController != NActors::TActorId{}) { pended = true; cache.PendingLimitRequest = ev->Sender; @@ -301,13 +308,6 @@ private: cache.PendingLimit.insert(metricName); Send(info.QuotaController, new TEvQuotaService::TQuotaLimitChangeRequest(subjectType, subjectId, metricName, cached.Usage.Limit.Value, limit)); continue; - } else { - if (cached.Usage.Limit.Value == 0 || limit == 0 || limit > cached.Usage.Limit.Value) { - // check hard limit only if quota is increased - if (info.HardLimit != 0 && (limit == 0 || limit > info.HardLimit)) { - limit = info.HardLimit; - } - } } } diff --git a/ydb/core/yq/libs/rate_limiter/control_plane_service/CMakeLists.txt b/ydb/core/yq/libs/rate_limiter/control_plane_service/CMakeLists.txt index f382ea088b..227adce3a4 100644 --- a/ydb/core/yq/libs/rate_limiter/control_plane_service/CMakeLists.txt +++ b/ydb/core/yq/libs/rate_limiter/control_plane_service/CMakeLists.txt @@ -18,6 +18,7 @@ target_link_libraries(libs-rate_limiter-control_plane_service PUBLIC ydb-core-protos libs-config-protos yq-libs-events + libs-quota_manager-events libs-rate_limiter-events libs-rate_limiter-utils yq-libs-shared_resources @@ -26,4 +27,5 @@ target_link_libraries(libs-rate_limiter-control_plane_service PUBLIC ) target_sources(libs-rate_limiter-control_plane_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/rate_limiter/control_plane_service/update_limit_actor.cpp ) diff --git a/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp b/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp index f3a1795b2e..a9a788470b 100644 --- a/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp +++ b/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp @@ -1,7 +1,9 @@ #include "rate_limiter_control_plane_service.h" +#include "update_limit_actor.h" #include <ydb/core/protos/services.pb.h> #include <ydb/core/yq/libs/events/events.h> +#include <ydb/core/yq/libs/quota_manager/events/events.h> #include <ydb/core/yq/libs/rate_limiter/events/control_plane_events.h> #include <ydb/core/yq/libs/rate_limiter/utils/path.h> #include <ydb/core/yq/libs/ydb/schema.h> @@ -50,7 +52,7 @@ ERetryErrorClass RetryFunc(const NYdb::TStatus& status) { return ERetryErrorClass::NoRetry; } -TYdbSdkRetryPolicy::TPtr MakeCreateSchemaRetryPolicy() { +TYdbSdkRetryPolicy::TPtr MakeSchemaRetryPolicy() { static auto policy = TYdbSdkRetryPolicy::GetExponentialBackoffPolicy(RetryFunc, TDuration::MilliSeconds(10), TDuration::Seconds(1), TDuration::Seconds(5)); return policy; } @@ -75,6 +77,11 @@ struct TRateLimiterRequestsQueue { return *this; } + TRateLimiterRequestsQueue& AddRequest(TEvQuotaService::TQuotaLimitChangeRequest::TPtr& ev) { + Queue.emplace_back(std::move(ev)); + return *this; + } + void ProcessRequests() { for (; Inflight.size() < MaxRequestsInflight && !Queue.empty(); Queue.pop_front(), NextCookie += CookieStep) { ProcessRequest(NextCookie, Inflight.emplace(NextCookie, std::move(Queue.front())).first->second); @@ -102,7 +109,12 @@ private: { } - using TOriginalRequestType = std::variant<TEvRateLimiter::TEvCreateResource::TPtr, TEvRateLimiter::TEvDeleteResource::TPtr>; + explicit TRequest(TEvQuotaService::TQuotaLimitChangeRequest::TPtr&& ev) + : OriginalRequest(std::move(ev)) + { + } + + using TOriginalRequestType = std::variant<TEvRateLimiter::TEvCreateResource::TPtr, TEvRateLimiter::TEvDeleteResource::TPtr, TEvQuotaService::TQuotaLimitChangeRequest::TPtr>; TOriginalRequestType OriginalRequest; }; @@ -121,7 +133,7 @@ private: RateLimiterPath, GetRateLimiterResourcePath(ev->Get()->CloudId, ev->Get()->Scope, ev->Get()->QueryId), {ev->Get()->CloudLimit, Nothing(), ev->Get()->QueryLimit}, - MakeCreateSchemaRetryPolicy(), + MakeSchemaRetryPolicy(), cookie ) ); @@ -136,7 +148,22 @@ private: Connection, RateLimiterPath, GetRateLimiterResourcePath(ev->Get()->CloudId, ev->Get()->Scope, ev->Get()->QueryId), - MakeCreateSchemaRetryPolicy(), + MakeSchemaRetryPolicy(), + cookie + ) + ); + } + + void ProcessRequest(ui64 cookie, TRequest& req, TEvQuotaService::TQuotaLimitChangeRequest::TPtr& ev) { + Y_UNUSED(req); + NActors::TActivationContext::AsActorContext().Register( + MakeUpdateCloudRateLimitActor( + NActors::TActivationContext::AsActorContext().SelfID, + Connection, + RateLimiterPath, + ev->Get()->SubjectId, + ev->Get()->LimitRequested, + MakeSchemaRetryPolicy(), cookie ) ); @@ -179,6 +206,23 @@ private: ); } + void ProcessResponse(TRequest& req, TEvents::TEvSchemaUpdated::TPtr& ev) { + TEvQuotaService::TQuotaLimitChangeRequest::TPtr& originalRequest = std::get<TEvQuotaService::TQuotaLimitChangeRequest::TPtr>(req.OriginalRequest); + auto& record = *originalRequest->Get(); + NActors::TActivationContext::AsActorContext().Send( + originalRequest->Sender, + new TEvQuotaService::TQuotaLimitChangeResponse( + record.SubjectType, + record.SubjectId, + record.MetricName, + ev->Get()->Result.IsSuccess() ? record.LimitRequested : record.Limit, + record.LimitRequested + ), + 0, // flags + originalRequest->Cookie + ); + } + private: const TString RateLimiterPath; const TYdbConnectionPtr Connection; @@ -217,7 +261,7 @@ public: } void RunCreateCoordinationNodeActor(const TString& path) { - Register(MakeCreateCoordinationNodeActor(SelfId(), NKikimrServices::YQ_RATE_LIMITER, YdbConnection, path, MakeCreateSchemaRetryPolicy())); + Register(MakeCreateCoordinationNodeActor(SelfId(), NKikimrServices::YQ_RATE_LIMITER, YdbConnection, path, MakeSchemaRetryPolicy())); ++CreatingCoordinationNodes; } @@ -266,6 +310,10 @@ public: GetRateLimiter(ev->Get()->CloudId).AddRequest(ev); } + void HandleInit(TEvQuotaService::TQuotaLimitChangeRequest::TPtr& ev) { + GetRateLimiter(ev->Get()->SubjectId).AddRequest(ev); + } + void HandleWorking(TEvRateLimiter::TEvCreateResource::TPtr& ev) { GetRateLimiter(ev->Get()->CloudId).AddRequest(ev).ProcessRequests(); } @@ -274,6 +322,10 @@ public: GetRateLimiter(ev->Get()->CloudId).AddRequest(ev).ProcessRequests(); } + void HandleWorking(TEvQuotaService::TQuotaLimitChangeRequest::TPtr& ev) { + GetRateLimiter(ev->Get()->SubjectId).AddRequest(ev).ProcessRequests(); + } + void HandleWorking(TEvents::TEvSchemaCreated::TPtr& ev) { GetRateLimiterByCookie(ev->Cookie).OnResponse(ev); } @@ -282,6 +334,10 @@ public: GetRateLimiterByCookie(ev->Cookie).OnResponse(ev); } + void HandleWorking(TEvents::TEvSchemaUpdated::TPtr& ev) { + GetRateLimiterByCookie(ev->Cookie).OnResponse(ev); + } + void HandleOff(TEvRateLimiter::TEvCreateResource::TPtr& ev) { Send(ev->Sender, new TEvRateLimiter::TEvCreateResourceResponse("")); } @@ -290,12 +346,25 @@ public: Send(ev->Sender, new TEvRateLimiter::TEvDeleteResourceResponse(true)); } + void HandleOff(TEvQuotaService::TQuotaLimitChangeRequest::TPtr& ev) { + auto& record = *ev->Get(); + + Send(ev->Sender, new TEvQuotaService::TQuotaLimitChangeResponse(record.SubjectType, record.SubjectId, record.MetricName, + record.LimitRequested, record.LimitRequested)); + } + + void HandleQuotaUsage(TEvQuotaService::TQuotaUsageRequest::TPtr& ev) { + Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(ev->Get()->SubjectType, ev->Get()->SubjectId, ev->Get()->MetricName, 0)); + } + // State func that does nothing. Rate limiting is turned off. // Answers "OK" responses and does nothing. STRICT_STFUNC( RateLimiterOffStateFunc, hFunc(TEvRateLimiter::TEvCreateResource, HandleOff); hFunc(TEvRateLimiter::TEvDeleteResource, HandleOff); + hFunc(TEvQuotaService::TQuotaLimitChangeRequest, HandleOff); + hFunc(TEvQuotaService::TQuotaUsageRequest, HandleQuotaUsage); ) // State func that inits limiters. @@ -305,6 +374,8 @@ public: hFunc(TEvents::TEvSchemaCreated, HandleInit); hFunc(TEvRateLimiter::TEvCreateResource, HandleInit); hFunc(TEvRateLimiter::TEvDeleteResource, HandleInit); + hFunc(TEvQuotaService::TQuotaLimitChangeRequest, HandleInit); + hFunc(TEvQuotaService::TQuotaUsageRequest, HandleQuotaUsage); ) // Working @@ -314,6 +385,9 @@ public: hFunc(TEvRateLimiter::TEvDeleteResource, HandleWorking); hFunc(TEvents::TEvSchemaCreated, HandleWorking); hFunc(TEvents::TEvSchemaDeleted, HandleWorking); + hFunc(TEvents::TEvSchemaUpdated, HandleWorking); + hFunc(TEvQuotaService::TQuotaLimitChangeRequest, HandleWorking); + hFunc(TEvQuotaService::TQuotaUsageRequest, HandleQuotaUsage); ) TRateLimiterRequestsQueue& GetRateLimiter(const TString& cloudId) { diff --git a/ydb/core/yq/libs/rate_limiter/control_plane_service/update_limit_actor.cpp b/ydb/core/yq/libs/rate_limiter/control_plane_service/update_limit_actor.cpp new file mode 100644 index 0000000000..078a2da7e3 --- /dev/null +++ b/ydb/core/yq/libs/rate_limiter/control_plane_service/update_limit_actor.cpp @@ -0,0 +1,119 @@ +#include "update_limit_actor.h" + +#include <ydb/core/protos/services.pb.h> +#include <ydb/core/yq/libs/events/events.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/hfunc.h> + +namespace NYq { +namespace { + +class TUpdateCloudRateLimitActor : public NActors::TActorBootstrapped<TUpdateCloudRateLimitActor> { +public: + TUpdateCloudRateLimitActor( + NActors::TActorId parent, + TYdbConnectionPtr connection, + const TString& coordinationNodePath, + const TString& cloudId, + ui64 limit, + TYdbSdkRetryPolicy::TPtr retryPolicy, + ui64 cookie) + : Parent(parent) + , Connection(std::move(connection)) + , CoordinationNodePath(coordinationNodePath) + , CloudId(cloudId) + , Limit(limit) + , RetryPolicy(std::move(retryPolicy)) + , Cookie(cookie) + { + } + + void Bootstrap() { + Become(&TUpdateCloudRateLimitActor::StateFunc); + + Register( + MakeUpdateRateLimiterResourceActor( + SelfId(), + NKikimrServices::YQ_RATE_LIMITER, + Connection, + CoordinationNodePath, + CloudId, + Limit * 10, + RetryPolicy + ) + ); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvents::TEvSchemaUpdated, Handle); + hFunc(TEvents::TEvSchemaCreated, Handle); + ); + + void Handle(TEvents::TEvSchemaUpdated::TPtr& ev) { + if (ev->Get()->Result.IsSuccess() || !IsPathDoesNotExistError(ev->Get()->Result)) { + SendResponseAndPassAway(std::move(ev->Get()->Result)); + return; + } + + // Path doesn't exist. Create it with new quota value + Register( + MakeCreateRateLimiterResourceActor( + SelfId(), + NKikimrServices::YQ_RATE_LIMITER, + Connection, + CoordinationNodePath, + CloudId, + {Limit * 10}, // percent -> milliseconds + RetryPolicy + ) + ); + } + + void Handle(TEvents::TEvSchemaCreated::TPtr& ev) { + SendResponseAndPassAway(std::move(ev->Get()->Result)); + } + + void SendResponseAndPassAway(NYdb::TStatus status) { + Send( + Parent, + new TEvents::TEvSchemaUpdated(std::move(status)), + 0, + Cookie + ); + PassAway(); + } + +private: + const NActors::TActorId Parent; + const TYdbConnectionPtr Connection; + const TString CoordinationNodePath; + const TString CloudId; + const ui64 Limit; + const TYdbSdkRetryPolicy::TPtr RetryPolicy; + const ui64 Cookie; +}; + +} // anonymous namespace + +NActors::IActor* MakeUpdateCloudRateLimitActor( + NActors::TActorId parent, + TYdbConnectionPtr connection, + const TString& coordinationNodePath, + const TString& cloudId, + ui64 limit, + TYdbSdkRetryPolicy::TPtr retryPolicy, + ui64 cookie) +{ + return new TUpdateCloudRateLimitActor( + parent, + std::move(connection), + coordinationNodePath, + cloudId, + limit, + std::move(retryPolicy), + cookie + ); +} + +} // namespace NYq diff --git a/ydb/core/yq/libs/rate_limiter/control_plane_service/update_limit_actor.h b/ydb/core/yq/libs/rate_limiter/control_plane_service/update_limit_actor.h new file mode 100644 index 0000000000..fbc9135517 --- /dev/null +++ b/ydb/core/yq/libs/rate_limiter/control_plane_service/update_limit_actor.h @@ -0,0 +1,20 @@ +#pragma once +#include <ydb/core/yq/libs/ydb/schema.h> +#include <ydb/core/yq/libs/ydb/ydb.h> + +#include <library/cpp/actors/core/actor.h> + +#include <util/generic/string.h> + +namespace NYq { + +NActors::IActor* MakeUpdateCloudRateLimitActor( + NActors::TActorId parent, + TYdbConnectionPtr connection, + const TString& coordinationNodePath, + const TString& cloudId, + ui64 limit, + TYdbSdkRetryPolicy::TPtr retryPolicy, + ui64 cookie); + +} // namespace NYq diff --git a/ydb/core/yq/libs/ydb/schema.cpp b/ydb/core/yq/libs/ydb/schema.cpp index 7e4135008b..4e82cfb47a 100644 --- a/ydb/core/yq/libs/ydb/schema.cpp +++ b/ydb/core/yq/libs/ydb/schema.cpp @@ -46,19 +46,6 @@ struct TEvPrivate { }; }; -bool ParentPathDoesNotExist(const NYdb::TStatus& status) { - if (status.GetStatus() == NYdb::EStatus::NOT_FOUND) { - return true; - } - if (status.GetStatus() == NYdb::EStatus::BAD_REQUEST) { - const TString issuesText = status.GetIssues().ToString(); - if (issuesText.Contains("doesn't exist") || issuesText.Contains("does not exist")) { - return true; - } - } - return false; -} - template <class TResponseEvent> class TSchemaActorBase : public NActors::TActorBootstrapped<TSchemaActorBase<TResponseEvent>> { public: @@ -202,6 +189,30 @@ protected: } }; +class TUpdateActorBase : public TSchemaActorBase<TEvents::TEvSchemaUpdated> { +public: + using TSchemaActorBase::TSchemaActorBase; + +protected: + void Handle(TEvents::TEvSchemaUpdated::TPtr& ev) override { + if (ev->Get()->Result.IsSuccess()) { + SCHEMA_LOG_DEBUG("Successfully updated " << GetEntityName()); + ReplyAndDie(ev); + return; + } + + SCHEMA_LOG_ERROR("Update " << GetEntityName() << " error: " << ev->Get()->Result.GetStatus() << " " << ev->Get()->Result.GetIssues().ToOneLineString()); + + if (!ScheduleNextAttempt(ev)) { + ReplyAndDie(ev); + } + } + + TStringBuf GetActionName() const override { + return "update"; + } +}; + template <class TRequestDesc, class TBase = TCreateActorBase> class TRecursiveCreateActorBase : public TBase { public: @@ -228,7 +239,7 @@ protected: this->CallAndSubscribe(); return; } - if (CurrentRequest > 0 && ParentPathDoesNotExist(ev->Get()->Result)) { + if (CurrentRequest > 0 && IsPathDoesNotExistError(ev->Get()->Result)) { if (!TriedPaths[CurrentRequest - 1]) { // Defence from cycles. --CurrentRequest; TriedPaths[CurrentRequest] = true; @@ -474,8 +485,54 @@ private: const TString ResourcePath; }; +class TUpdateRateLimiterResourceActor : public TUpdateActorBase { +public: + TUpdateRateLimiterResourceActor( + NActors::TActorId parent, + ui64 logComponent, + TYdbConnectionPtr connection, + const TString& coordinationNodePath, + const TString& resourcePath, + TMaybe<ui64> limit, + TYdbSdkRetryPolicy::TPtr retryPolicy, + ui64 cookie) + : TUpdateActorBase(parent, logComponent, std::move(connection), std::move(retryPolicy), cookie) + , CoordinationNodePath(coordinationNodePath) + , ResourcePath(resourcePath) + , Limit(limit) + { + } + +private: + TString GetEntityName() const override { + return TStringBuilder() << "rate limiter resource \"" << ResourcePath << "\""; + } + + NYdb::TAsyncStatus CallYdbSdk() override { + return Connection->RateLimiterClient.AlterResource(CoordinationNodePath, ResourcePath, NYdb::NRateLimiter::TAlterResourceSettings().MaxUnitsPerSecond(Limit)); + } + +private: + const TString CoordinationNodePath; + const TString ResourcePath; + const TMaybe<ui64> Limit; +}; + } // namespace +bool IsPathDoesNotExistError(const NYdb::TStatus& status) { + if (status.GetStatus() == NYdb::EStatus::NOT_FOUND) { + return true; + } + if (status.GetStatus() == NYdb::EStatus::BAD_REQUEST) { + const TString issuesText = status.GetIssues().ToString(); + if (issuesText.Contains("doesn't exist") || issuesText.Contains("does not exist")) { + return true; + } + } + return false; +} + NActors::IActor* MakeCreateTableActor( NActors::TActorId parent, ui64 logComponent, @@ -574,4 +631,26 @@ NActors::IActor* MakeDeleteRateLimiterResourceActor( ); } +NActors::IActor* MakeUpdateRateLimiterResourceActor( + NActors::TActorId parent, + ui64 logComponent, + TYdbConnectionPtr connection, + const TString& coordinationNodePath, + const TString& resourcePath, + TMaybe<double> limit, + TYdbSdkRetryPolicy::TPtr retryPolicy, + ui64 cookie) +{ + return new TUpdateRateLimiterResourceActor( + parent, + logComponent, + std::move(connection), + coordinationNodePath, + resourcePath, + limit, + std::move(retryPolicy), + cookie + ); +} + } // namespace NYq diff --git a/ydb/core/yq/libs/ydb/schema.h b/ydb/core/yq/libs/ydb/schema.h index e13660a59e..b685d41d1b 100644 --- a/ydb/core/yq/libs/ydb/schema.h +++ b/ydb/core/yq/libs/ydb/schema.h @@ -63,4 +63,18 @@ NActors::IActor* MakeDeleteRateLimiterResourceActor( TYdbSdkRetryPolicy::TPtr retryPolicy, ui64 cookie = 0); +// Actor that updates rate limiter resource. +// Sends TEvSchemaUpdated to parent (if any). +NActors::IActor* MakeUpdateRateLimiterResourceActor( + NActors::TActorId parent, + ui64 logComponent, + TYdbConnectionPtr connection, + const TString& coordinationNodePath, + const TString& resourcePath, + TMaybe<double> limit, + TYdbSdkRetryPolicy::TPtr retryPolicy, + ui64 cookie = 0); + +bool IsPathDoesNotExistError(const NYdb::TStatus& status); + } // namespace NYq |