aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2022-09-05 12:10:25 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2022-09-05 12:10:25 +0300
commit7cf7de710c1f50c20a229d12b33d18b8f4f0f736 (patch)
tree0355b8befbe655bfe6a762b907699d82aca5fc85
parentd3935667210c5e85135757ceeed9663bc6f3d914 (diff)
downloadydb-7cf7de710c1f50c20a229d12b33d18b8f4f0f736.tar.gz
Handle quota update in rate limiter
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/rate_limiter_resources.cpp2
-rw-r--r--ydb/core/yq/libs/events/event_ids.h1
-rw-r--r--ydb/core/yq/libs/events/events.h9
-rw-r--r--ydb/core/yq/libs/init/init.cpp2
-rw-r--r--ydb/core/yq/libs/quota_manager/quota_manager.cpp14
-rw-r--r--ydb/core/yq/libs/rate_limiter/control_plane_service/CMakeLists.txt2
-rw-r--r--ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp84
-rw-r--r--ydb/core/yq/libs/rate_limiter/control_plane_service/update_limit_actor.cpp119
-rw-r--r--ydb/core/yq/libs/rate_limiter/control_plane_service/update_limit_actor.h20
-rw-r--r--ydb/core/yq/libs/ydb/schema.cpp107
-rw-r--r--ydb/core/yq/libs/ydb/schema.h14
11 files changed, 346 insertions, 28 deletions
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/rate_limiter_resources.cpp b/ydb/core/yq/libs/control_plane_storage/internal/rate_limiter_resources.cpp
index 128bc20f96..750d349db3 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/rate_limiter_resources.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/rate_limiter_resources.cpp
@@ -188,7 +188,7 @@ public:
void Handle(TEvQuotaService::TQuotaGetResponse::TPtr& ev) {
CPS_LOG_D("Got response from quota service");
if (auto quotaIt = ev->Get()->Quotas.find(QUOTA_CPU_PERCENT_LIMIT); quotaIt != ev->Get()->Quotas.end()) {
- CloudLimit = static_cast<double>(quotaIt->second.Limit.Value * 100 * 1000);
+ CloudLimit = static_cast<double>(quotaIt->second.Limit.Value * 10); // percent -> milliseconds
Send(RateLimiterControlPlaneServiceId(), new TEvRateLimiter::TEvCreateResource(CloudId, FolderId, QueryId, CloudLimit, QueryLimit));
} else {
ReplyWithError("CPU quota for cloud was not found");
diff --git a/ydb/core/yq/libs/events/event_ids.h b/ydb/core/yq/libs/events/event_ids.h
index c6e7060372..7a6755db25 100644
--- a/ydb/core/yq/libs/events/event_ids.h
+++ b/ydb/core/yq/libs/events/event_ids.h
@@ -52,6 +52,7 @@ struct TEventIds {
EvDeleteRateLimiterResourceResponse,
EvSchemaDeleted,
+ EvSchemaUpdated,
// Special events
EvEnd
diff --git a/ydb/core/yq/libs/events/events.h b/ydb/core/yq/libs/events/events.h
index 11c9b453f2..e08b8efd45 100644
--- a/ydb/core/yq/libs/events/events.h
+++ b/ydb/core/yq/libs/events/events.h
@@ -264,6 +264,15 @@ struct TEvents {
NYdb::TStatus Result;
};
+ struct TEvSchemaUpdated : public NActors::TEventLocal<TEvSchemaUpdated, TEventIds::EvSchemaUpdated> {
+ explicit TEvSchemaUpdated(NYdb::TStatus result)
+ : Result(std::move(result))
+ {
+ }
+
+ NYdb::TStatus Result;
+ };
+
struct TEvCallback : public NActors::TEventLocal<TEvCallback, TEventIds::EvCallback> {
explicit TEvCallback(std::function<void()> callback)
: Callback(callback)
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index 7f363212d7..c1d8ff91da 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -304,7 +304,7 @@ void Init(
{
TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_ANALYTICS_COUNT_LIMIT, 100, 1000, NYq::ControlPlaneStorageServiceActorId()),
TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_STREAMING_COUNT_LIMIT, 100, 1000, NYq::ControlPlaneStorageServiceActorId()),
- TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_CPU_PERCENT_LIMIT, 200, 3200),
+ TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_CPU_PERCENT_LIMIT, 200, 3200, protoConfig.GetRateLimiter().GetControlPlaneEnabled() ? NYq::RateLimiterControlPlaneServiceId() : NActors::TActorId()),
TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_MEMORY_LIMIT, 0),
TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_RESULT_LIMIT, 0),
TQuotaDescription(SUBJECT_TYPE_CLOUD, QUOTA_ANALYTICS_DURATION_LIMIT, 1440),
diff --git a/ydb/core/yq/libs/quota_manager/quota_manager.cpp b/ydb/core/yq/libs/quota_manager/quota_manager.cpp
index 948f667b34..ba18ae9256 100644
--- a/ydb/core/yq/libs/quota_manager/quota_manager.cpp
+++ b/ydb/core/yq/libs/quota_manager/quota_manager.cpp
@@ -294,6 +294,13 @@ private:
auto itI = infoMap.find(metricName);
if (itI != infoMap.end()) {
auto& info = itI->second;
+ if (cached.Usage.Limit.Value == 0 || limit == 0 || limit > cached.Usage.Limit.Value) {
+ // check hard limit only if quota is increased
+ if (info.HardLimit != 0 && (limit == 0 || limit > info.HardLimit)) {
+ limit = info.HardLimit;
+ }
+ }
+
if (info.QuotaController != NActors::TActorId{}) {
pended = true;
cache.PendingLimitRequest = ev->Sender;
@@ -301,13 +308,6 @@ private:
cache.PendingLimit.insert(metricName);
Send(info.QuotaController, new TEvQuotaService::TQuotaLimitChangeRequest(subjectType, subjectId, metricName, cached.Usage.Limit.Value, limit));
continue;
- } else {
- if (cached.Usage.Limit.Value == 0 || limit == 0 || limit > cached.Usage.Limit.Value) {
- // check hard limit only if quota is increased
- if (info.HardLimit != 0 && (limit == 0 || limit > info.HardLimit)) {
- limit = info.HardLimit;
- }
- }
}
}
diff --git a/ydb/core/yq/libs/rate_limiter/control_plane_service/CMakeLists.txt b/ydb/core/yq/libs/rate_limiter/control_plane_service/CMakeLists.txt
index f382ea088b..227adce3a4 100644
--- a/ydb/core/yq/libs/rate_limiter/control_plane_service/CMakeLists.txt
+++ b/ydb/core/yq/libs/rate_limiter/control_plane_service/CMakeLists.txt
@@ -18,6 +18,7 @@ target_link_libraries(libs-rate_limiter-control_plane_service PUBLIC
ydb-core-protos
libs-config-protos
yq-libs-events
+ libs-quota_manager-events
libs-rate_limiter-events
libs-rate_limiter-utils
yq-libs-shared_resources
@@ -26,4 +27,5 @@ target_link_libraries(libs-rate_limiter-control_plane_service PUBLIC
)
target_sources(libs-rate_limiter-control_plane_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/rate_limiter/control_plane_service/update_limit_actor.cpp
)
diff --git a/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp b/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp
index f3a1795b2e..a9a788470b 100644
--- a/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp
+++ b/ydb/core/yq/libs/rate_limiter/control_plane_service/rate_limiter_control_plane_service.cpp
@@ -1,7 +1,9 @@
#include "rate_limiter_control_plane_service.h"
+#include "update_limit_actor.h"
#include <ydb/core/protos/services.pb.h>
#include <ydb/core/yq/libs/events/events.h>
+#include <ydb/core/yq/libs/quota_manager/events/events.h>
#include <ydb/core/yq/libs/rate_limiter/events/control_plane_events.h>
#include <ydb/core/yq/libs/rate_limiter/utils/path.h>
#include <ydb/core/yq/libs/ydb/schema.h>
@@ -50,7 +52,7 @@ ERetryErrorClass RetryFunc(const NYdb::TStatus& status) {
return ERetryErrorClass::NoRetry;
}
-TYdbSdkRetryPolicy::TPtr MakeCreateSchemaRetryPolicy() {
+TYdbSdkRetryPolicy::TPtr MakeSchemaRetryPolicy() {
static auto policy = TYdbSdkRetryPolicy::GetExponentialBackoffPolicy(RetryFunc, TDuration::MilliSeconds(10), TDuration::Seconds(1), TDuration::Seconds(5));
return policy;
}
@@ -75,6 +77,11 @@ struct TRateLimiterRequestsQueue {
return *this;
}
+ TRateLimiterRequestsQueue& AddRequest(TEvQuotaService::TQuotaLimitChangeRequest::TPtr& ev) {
+ Queue.emplace_back(std::move(ev));
+ return *this;
+ }
+
void ProcessRequests() {
for (; Inflight.size() < MaxRequestsInflight && !Queue.empty(); Queue.pop_front(), NextCookie += CookieStep) {
ProcessRequest(NextCookie, Inflight.emplace(NextCookie, std::move(Queue.front())).first->second);
@@ -102,7 +109,12 @@ private:
{
}
- using TOriginalRequestType = std::variant<TEvRateLimiter::TEvCreateResource::TPtr, TEvRateLimiter::TEvDeleteResource::TPtr>;
+ explicit TRequest(TEvQuotaService::TQuotaLimitChangeRequest::TPtr&& ev)
+ : OriginalRequest(std::move(ev))
+ {
+ }
+
+ using TOriginalRequestType = std::variant<TEvRateLimiter::TEvCreateResource::TPtr, TEvRateLimiter::TEvDeleteResource::TPtr, TEvQuotaService::TQuotaLimitChangeRequest::TPtr>;
TOriginalRequestType OriginalRequest;
};
@@ -121,7 +133,7 @@ private:
RateLimiterPath,
GetRateLimiterResourcePath(ev->Get()->CloudId, ev->Get()->Scope, ev->Get()->QueryId),
{ev->Get()->CloudLimit, Nothing(), ev->Get()->QueryLimit},
- MakeCreateSchemaRetryPolicy(),
+ MakeSchemaRetryPolicy(),
cookie
)
);
@@ -136,7 +148,22 @@ private:
Connection,
RateLimiterPath,
GetRateLimiterResourcePath(ev->Get()->CloudId, ev->Get()->Scope, ev->Get()->QueryId),
- MakeCreateSchemaRetryPolicy(),
+ MakeSchemaRetryPolicy(),
+ cookie
+ )
+ );
+ }
+
+ void ProcessRequest(ui64 cookie, TRequest& req, TEvQuotaService::TQuotaLimitChangeRequest::TPtr& ev) {
+ Y_UNUSED(req);
+ NActors::TActivationContext::AsActorContext().Register(
+ MakeUpdateCloudRateLimitActor(
+ NActors::TActivationContext::AsActorContext().SelfID,
+ Connection,
+ RateLimiterPath,
+ ev->Get()->SubjectId,
+ ev->Get()->LimitRequested,
+ MakeSchemaRetryPolicy(),
cookie
)
);
@@ -179,6 +206,23 @@ private:
);
}
+ void ProcessResponse(TRequest& req, TEvents::TEvSchemaUpdated::TPtr& ev) {
+ TEvQuotaService::TQuotaLimitChangeRequest::TPtr& originalRequest = std::get<TEvQuotaService::TQuotaLimitChangeRequest::TPtr>(req.OriginalRequest);
+ auto& record = *originalRequest->Get();
+ NActors::TActivationContext::AsActorContext().Send(
+ originalRequest->Sender,
+ new TEvQuotaService::TQuotaLimitChangeResponse(
+ record.SubjectType,
+ record.SubjectId,
+ record.MetricName,
+ ev->Get()->Result.IsSuccess() ? record.LimitRequested : record.Limit,
+ record.LimitRequested
+ ),
+ 0, // flags
+ originalRequest->Cookie
+ );
+ }
+
private:
const TString RateLimiterPath;
const TYdbConnectionPtr Connection;
@@ -217,7 +261,7 @@ public:
}
void RunCreateCoordinationNodeActor(const TString& path) {
- Register(MakeCreateCoordinationNodeActor(SelfId(), NKikimrServices::YQ_RATE_LIMITER, YdbConnection, path, MakeCreateSchemaRetryPolicy()));
+ Register(MakeCreateCoordinationNodeActor(SelfId(), NKikimrServices::YQ_RATE_LIMITER, YdbConnection, path, MakeSchemaRetryPolicy()));
++CreatingCoordinationNodes;
}
@@ -266,6 +310,10 @@ public:
GetRateLimiter(ev->Get()->CloudId).AddRequest(ev);
}
+ void HandleInit(TEvQuotaService::TQuotaLimitChangeRequest::TPtr& ev) {
+ GetRateLimiter(ev->Get()->SubjectId).AddRequest(ev);
+ }
+
void HandleWorking(TEvRateLimiter::TEvCreateResource::TPtr& ev) {
GetRateLimiter(ev->Get()->CloudId).AddRequest(ev).ProcessRequests();
}
@@ -274,6 +322,10 @@ public:
GetRateLimiter(ev->Get()->CloudId).AddRequest(ev).ProcessRequests();
}
+ void HandleWorking(TEvQuotaService::TQuotaLimitChangeRequest::TPtr& ev) {
+ GetRateLimiter(ev->Get()->SubjectId).AddRequest(ev).ProcessRequests();
+ }
+
void HandleWorking(TEvents::TEvSchemaCreated::TPtr& ev) {
GetRateLimiterByCookie(ev->Cookie).OnResponse(ev);
}
@@ -282,6 +334,10 @@ public:
GetRateLimiterByCookie(ev->Cookie).OnResponse(ev);
}
+ void HandleWorking(TEvents::TEvSchemaUpdated::TPtr& ev) {
+ GetRateLimiterByCookie(ev->Cookie).OnResponse(ev);
+ }
+
void HandleOff(TEvRateLimiter::TEvCreateResource::TPtr& ev) {
Send(ev->Sender, new TEvRateLimiter::TEvCreateResourceResponse(""));
}
@@ -290,12 +346,25 @@ public:
Send(ev->Sender, new TEvRateLimiter::TEvDeleteResourceResponse(true));
}
+ void HandleOff(TEvQuotaService::TQuotaLimitChangeRequest::TPtr& ev) {
+ auto& record = *ev->Get();
+
+ Send(ev->Sender, new TEvQuotaService::TQuotaLimitChangeResponse(record.SubjectType, record.SubjectId, record.MetricName,
+ record.LimitRequested, record.LimitRequested));
+ }
+
+ void HandleQuotaUsage(TEvQuotaService::TQuotaUsageRequest::TPtr& ev) {
+ Send(ev->Sender, new TEvQuotaService::TQuotaUsageResponse(ev->Get()->SubjectType, ev->Get()->SubjectId, ev->Get()->MetricName, 0));
+ }
+
// State func that does nothing. Rate limiting is turned off.
// Answers "OK" responses and does nothing.
STRICT_STFUNC(
RateLimiterOffStateFunc,
hFunc(TEvRateLimiter::TEvCreateResource, HandleOff);
hFunc(TEvRateLimiter::TEvDeleteResource, HandleOff);
+ hFunc(TEvQuotaService::TQuotaLimitChangeRequest, HandleOff);
+ hFunc(TEvQuotaService::TQuotaUsageRequest, HandleQuotaUsage);
)
// State func that inits limiters.
@@ -305,6 +374,8 @@ public:
hFunc(TEvents::TEvSchemaCreated, HandleInit);
hFunc(TEvRateLimiter::TEvCreateResource, HandleInit);
hFunc(TEvRateLimiter::TEvDeleteResource, HandleInit);
+ hFunc(TEvQuotaService::TQuotaLimitChangeRequest, HandleInit);
+ hFunc(TEvQuotaService::TQuotaUsageRequest, HandleQuotaUsage);
)
// Working
@@ -314,6 +385,9 @@ public:
hFunc(TEvRateLimiter::TEvDeleteResource, HandleWorking);
hFunc(TEvents::TEvSchemaCreated, HandleWorking);
hFunc(TEvents::TEvSchemaDeleted, HandleWorking);
+ hFunc(TEvents::TEvSchemaUpdated, HandleWorking);
+ hFunc(TEvQuotaService::TQuotaLimitChangeRequest, HandleWorking);
+ hFunc(TEvQuotaService::TQuotaUsageRequest, HandleQuotaUsage);
)
TRateLimiterRequestsQueue& GetRateLimiter(const TString& cloudId) {
diff --git a/ydb/core/yq/libs/rate_limiter/control_plane_service/update_limit_actor.cpp b/ydb/core/yq/libs/rate_limiter/control_plane_service/update_limit_actor.cpp
new file mode 100644
index 0000000000..078a2da7e3
--- /dev/null
+++ b/ydb/core/yq/libs/rate_limiter/control_plane_service/update_limit_actor.cpp
@@ -0,0 +1,119 @@
+#include "update_limit_actor.h"
+
+#include <ydb/core/protos/services.pb.h>
+#include <ydb/core/yq/libs/events/events.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/hfunc.h>
+
+namespace NYq {
+namespace {
+
+class TUpdateCloudRateLimitActor : public NActors::TActorBootstrapped<TUpdateCloudRateLimitActor> {
+public:
+ TUpdateCloudRateLimitActor(
+ NActors::TActorId parent,
+ TYdbConnectionPtr connection,
+ const TString& coordinationNodePath,
+ const TString& cloudId,
+ ui64 limit,
+ TYdbSdkRetryPolicy::TPtr retryPolicy,
+ ui64 cookie)
+ : Parent(parent)
+ , Connection(std::move(connection))
+ , CoordinationNodePath(coordinationNodePath)
+ , CloudId(cloudId)
+ , Limit(limit)
+ , RetryPolicy(std::move(retryPolicy))
+ , Cookie(cookie)
+ {
+ }
+
+ void Bootstrap() {
+ Become(&TUpdateCloudRateLimitActor::StateFunc);
+
+ Register(
+ MakeUpdateRateLimiterResourceActor(
+ SelfId(),
+ NKikimrServices::YQ_RATE_LIMITER,
+ Connection,
+ CoordinationNodePath,
+ CloudId,
+ Limit * 10,
+ RetryPolicy
+ )
+ );
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvents::TEvSchemaUpdated, Handle);
+ hFunc(TEvents::TEvSchemaCreated, Handle);
+ );
+
+ void Handle(TEvents::TEvSchemaUpdated::TPtr& ev) {
+ if (ev->Get()->Result.IsSuccess() || !IsPathDoesNotExistError(ev->Get()->Result)) {
+ SendResponseAndPassAway(std::move(ev->Get()->Result));
+ return;
+ }
+
+ // Path doesn't exist. Create it with new quota value
+ Register(
+ MakeCreateRateLimiterResourceActor(
+ SelfId(),
+ NKikimrServices::YQ_RATE_LIMITER,
+ Connection,
+ CoordinationNodePath,
+ CloudId,
+ {Limit * 10}, // percent -> milliseconds
+ RetryPolicy
+ )
+ );
+ }
+
+ void Handle(TEvents::TEvSchemaCreated::TPtr& ev) {
+ SendResponseAndPassAway(std::move(ev->Get()->Result));
+ }
+
+ void SendResponseAndPassAway(NYdb::TStatus status) {
+ Send(
+ Parent,
+ new TEvents::TEvSchemaUpdated(std::move(status)),
+ 0,
+ Cookie
+ );
+ PassAway();
+ }
+
+private:
+ const NActors::TActorId Parent;
+ const TYdbConnectionPtr Connection;
+ const TString CoordinationNodePath;
+ const TString CloudId;
+ const ui64 Limit;
+ const TYdbSdkRetryPolicy::TPtr RetryPolicy;
+ const ui64 Cookie;
+};
+
+} // anonymous namespace
+
+NActors::IActor* MakeUpdateCloudRateLimitActor(
+ NActors::TActorId parent,
+ TYdbConnectionPtr connection,
+ const TString& coordinationNodePath,
+ const TString& cloudId,
+ ui64 limit,
+ TYdbSdkRetryPolicy::TPtr retryPolicy,
+ ui64 cookie)
+{
+ return new TUpdateCloudRateLimitActor(
+ parent,
+ std::move(connection),
+ coordinationNodePath,
+ cloudId,
+ limit,
+ std::move(retryPolicy),
+ cookie
+ );
+}
+
+} // namespace NYq
diff --git a/ydb/core/yq/libs/rate_limiter/control_plane_service/update_limit_actor.h b/ydb/core/yq/libs/rate_limiter/control_plane_service/update_limit_actor.h
new file mode 100644
index 0000000000..fbc9135517
--- /dev/null
+++ b/ydb/core/yq/libs/rate_limiter/control_plane_service/update_limit_actor.h
@@ -0,0 +1,20 @@
+#pragma once
+#include <ydb/core/yq/libs/ydb/schema.h>
+#include <ydb/core/yq/libs/ydb/ydb.h>
+
+#include <library/cpp/actors/core/actor.h>
+
+#include <util/generic/string.h>
+
+namespace NYq {
+
+NActors::IActor* MakeUpdateCloudRateLimitActor(
+ NActors::TActorId parent,
+ TYdbConnectionPtr connection,
+ const TString& coordinationNodePath,
+ const TString& cloudId,
+ ui64 limit,
+ TYdbSdkRetryPolicy::TPtr retryPolicy,
+ ui64 cookie);
+
+} // namespace NYq
diff --git a/ydb/core/yq/libs/ydb/schema.cpp b/ydb/core/yq/libs/ydb/schema.cpp
index 7e4135008b..4e82cfb47a 100644
--- a/ydb/core/yq/libs/ydb/schema.cpp
+++ b/ydb/core/yq/libs/ydb/schema.cpp
@@ -46,19 +46,6 @@ struct TEvPrivate {
};
};
-bool ParentPathDoesNotExist(const NYdb::TStatus& status) {
- if (status.GetStatus() == NYdb::EStatus::NOT_FOUND) {
- return true;
- }
- if (status.GetStatus() == NYdb::EStatus::BAD_REQUEST) {
- const TString issuesText = status.GetIssues().ToString();
- if (issuesText.Contains("doesn't exist") || issuesText.Contains("does not exist")) {
- return true;
- }
- }
- return false;
-}
-
template <class TResponseEvent>
class TSchemaActorBase : public NActors::TActorBootstrapped<TSchemaActorBase<TResponseEvent>> {
public:
@@ -202,6 +189,30 @@ protected:
}
};
+class TUpdateActorBase : public TSchemaActorBase<TEvents::TEvSchemaUpdated> {
+public:
+ using TSchemaActorBase::TSchemaActorBase;
+
+protected:
+ void Handle(TEvents::TEvSchemaUpdated::TPtr& ev) override {
+ if (ev->Get()->Result.IsSuccess()) {
+ SCHEMA_LOG_DEBUG("Successfully updated " << GetEntityName());
+ ReplyAndDie(ev);
+ return;
+ }
+
+ SCHEMA_LOG_ERROR("Update " << GetEntityName() << " error: " << ev->Get()->Result.GetStatus() << " " << ev->Get()->Result.GetIssues().ToOneLineString());
+
+ if (!ScheduleNextAttempt(ev)) {
+ ReplyAndDie(ev);
+ }
+ }
+
+ TStringBuf GetActionName() const override {
+ return "update";
+ }
+};
+
template <class TRequestDesc, class TBase = TCreateActorBase>
class TRecursiveCreateActorBase : public TBase {
public:
@@ -228,7 +239,7 @@ protected:
this->CallAndSubscribe();
return;
}
- if (CurrentRequest > 0 && ParentPathDoesNotExist(ev->Get()->Result)) {
+ if (CurrentRequest > 0 && IsPathDoesNotExistError(ev->Get()->Result)) {
if (!TriedPaths[CurrentRequest - 1]) { // Defence from cycles.
--CurrentRequest;
TriedPaths[CurrentRequest] = true;
@@ -474,8 +485,54 @@ private:
const TString ResourcePath;
};
+class TUpdateRateLimiterResourceActor : public TUpdateActorBase {
+public:
+ TUpdateRateLimiterResourceActor(
+ NActors::TActorId parent,
+ ui64 logComponent,
+ TYdbConnectionPtr connection,
+ const TString& coordinationNodePath,
+ const TString& resourcePath,
+ TMaybe<ui64> limit,
+ TYdbSdkRetryPolicy::TPtr retryPolicy,
+ ui64 cookie)
+ : TUpdateActorBase(parent, logComponent, std::move(connection), std::move(retryPolicy), cookie)
+ , CoordinationNodePath(coordinationNodePath)
+ , ResourcePath(resourcePath)
+ , Limit(limit)
+ {
+ }
+
+private:
+ TString GetEntityName() const override {
+ return TStringBuilder() << "rate limiter resource \"" << ResourcePath << "\"";
+ }
+
+ NYdb::TAsyncStatus CallYdbSdk() override {
+ return Connection->RateLimiterClient.AlterResource(CoordinationNodePath, ResourcePath, NYdb::NRateLimiter::TAlterResourceSettings().MaxUnitsPerSecond(Limit));
+ }
+
+private:
+ const TString CoordinationNodePath;
+ const TString ResourcePath;
+ const TMaybe<ui64> Limit;
+};
+
} // namespace
+bool IsPathDoesNotExistError(const NYdb::TStatus& status) {
+ if (status.GetStatus() == NYdb::EStatus::NOT_FOUND) {
+ return true;
+ }
+ if (status.GetStatus() == NYdb::EStatus::BAD_REQUEST) {
+ const TString issuesText = status.GetIssues().ToString();
+ if (issuesText.Contains("doesn't exist") || issuesText.Contains("does not exist")) {
+ return true;
+ }
+ }
+ return false;
+}
+
NActors::IActor* MakeCreateTableActor(
NActors::TActorId parent,
ui64 logComponent,
@@ -574,4 +631,26 @@ NActors::IActor* MakeDeleteRateLimiterResourceActor(
);
}
+NActors::IActor* MakeUpdateRateLimiterResourceActor(
+ NActors::TActorId parent,
+ ui64 logComponent,
+ TYdbConnectionPtr connection,
+ const TString& coordinationNodePath,
+ const TString& resourcePath,
+ TMaybe<double> limit,
+ TYdbSdkRetryPolicy::TPtr retryPolicy,
+ ui64 cookie)
+{
+ return new TUpdateRateLimiterResourceActor(
+ parent,
+ logComponent,
+ std::move(connection),
+ coordinationNodePath,
+ resourcePath,
+ limit,
+ std::move(retryPolicy),
+ cookie
+ );
+}
+
} // namespace NYq
diff --git a/ydb/core/yq/libs/ydb/schema.h b/ydb/core/yq/libs/ydb/schema.h
index e13660a59e..b685d41d1b 100644
--- a/ydb/core/yq/libs/ydb/schema.h
+++ b/ydb/core/yq/libs/ydb/schema.h
@@ -63,4 +63,18 @@ NActors::IActor* MakeDeleteRateLimiterResourceActor(
TYdbSdkRetryPolicy::TPtr retryPolicy,
ui64 cookie = 0);
+// Actor that updates rate limiter resource.
+// Sends TEvSchemaUpdated to parent (if any).
+NActors::IActor* MakeUpdateRateLimiterResourceActor(
+ NActors::TActorId parent,
+ ui64 logComponent,
+ TYdbConnectionPtr connection,
+ const TString& coordinationNodePath,
+ const TString& resourcePath,
+ TMaybe<double> limit,
+ TYdbSdkRetryPolicy::TPtr retryPolicy,
+ ui64 cookie = 0);
+
+bool IsPathDoesNotExistError(const NYdb::TStatus& status);
+
} // namespace NYq