aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorcube <cube@yandex-team.ru>2022-02-10 16:52:24 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:52:24 +0300
commit94684d13d71865e531792fa1c4c8f243a89fe0e7 (patch)
treed3948cb3aed1dc873399f0c173086226ed8bcc65
parent70f16fb92d56f318d89b7f836104bc9ef6eb0067 (diff)
downloadydb-94684d13d71865e531792fa1c4c8f243a89fe0e7.tar.gz
Restoring authorship annotation for <cube@yandex-team.ru>. Commit 1 of 2.
-rw-r--r--ydb/core/base/quoter.h10
-rw-r--r--ydb/core/grpc_services/base/base.h2
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.h2
-rw-r--r--ydb/core/grpc_services/rpc_rate_limiter_api.cpp896
-rw-r--r--ydb/core/grpc_services/ya.make2
-rw-r--r--ydb/core/quoter/probes.h6
-rw-r--r--ydb/core/quoter/quoter_service.cpp70
-rw-r--r--ydb/core/quoter/quoter_service_impl.h6
-rw-r--r--ydb/core/quoter/quoter_service_ut.cpp68
-rw-r--r--ydb/public/api/grpc/ydb_rate_limiter_v1.proto6
-rw-r--r--ydb/public/api/protos/ydb_rate_limiter.proto60
-rw-r--r--ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.cpp42
-rw-r--r--ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.h20
-rw-r--r--ydb/services/rate_limiter/grpc_service.cpp2
-rw-r--r--ydb/services/rate_limiter/rate_limiter_ut.cpp38
15 files changed, 615 insertions, 615 deletions
diff --git a/ydb/core/base/quoter.h b/ydb/core/base/quoter.h
index 41a19775bc8..d477e52b2aa 100644
--- a/ydb/core/base/quoter.h
+++ b/ydb/core/base/quoter.h
@@ -40,24 +40,24 @@ struct TEvQuota {
const TString Resource;
const ui64 Amount;
- const bool IsUsedAmount;
+ const bool IsUsedAmount;
TResourceLeaf(const TResourceLeaf&) = default;
- TResourceLeaf(ui64 quoterId, ui64 resourceId, ui64 amount, bool isUsedAmount = false)
+ TResourceLeaf(ui64 quoterId, ui64 resourceId, ui64 amount, bool isUsedAmount = false)
: QuoterId(quoterId)
, ResourceId(resourceId)
, Amount(amount)
- , IsUsedAmount(isUsedAmount)
+ , IsUsedAmount(isUsedAmount)
{}
- TResourceLeaf(const TString &quoter, const TString &resource, ui64 amount, bool isUsedAmount = false)
+ TResourceLeaf(const TString &quoter, const TString &resource, ui64 amount, bool isUsedAmount = false)
: QuoterId(0)
, ResourceId(0)
, Quoter(quoter)
, Resource(resource)
, Amount(amount)
- , IsUsedAmount(isUsedAmount)
+ , IsUsedAmount(isUsedAmount)
{}
};
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index 44b25c4a5f9..8fffd7589f8 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -113,7 +113,7 @@ struct TRpcServices {
EvDropRateLimiterResource,
EvListRateLimiterResources,
EvDescribeRateLimiterResource,
- EvAcquireRateLimiterResource,
+ EvAcquireRateLimiterResource,
EvKikhouseCreateSnapshot,
EvKikhouseRefreshSnapshot,
EvKikhouseDiscardSnapshot,
diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h
index 97315f6e9fb..268f708907e 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.h
+++ b/ydb/core/grpc_services/grpc_request_proxy.h
@@ -108,7 +108,7 @@ protected:
void Handle(TEvDropRateLimiterResource::TPtr& ev, const TActorContext& ctx);
void Handle(TEvListRateLimiterResources::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDescribeRateLimiterResource::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvAcquireRateLimiterResource::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvAcquireRateLimiterResource::TPtr& ev, const TActorContext& ctx);
void Handle(TEvKikhouseCreateSnapshotRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvKikhouseRefreshSnapshotRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvKikhouseDiscardSnapshotRequest::TPtr& ev, const TActorContext& ctx);
diff --git a/ydb/core/grpc_services/rpc_rate_limiter_api.cpp b/ydb/core/grpc_services/rpc_rate_limiter_api.cpp
index 5579be8bdb3..d926a414e32 100644
--- a/ydb/core/grpc_services/rpc_rate_limiter_api.cpp
+++ b/ydb/core/grpc_services/rpc_rate_limiter_api.cpp
@@ -1,52 +1,52 @@
-#include "grpc_request_proxy.h"
-
-#include "rpc_calls.h"
-#include "rpc_scheme_base.h"
-#include "rpc_common.h"
-
+#include "grpc_request_proxy.h"
+
+#include "rpc_calls.h"
+#include "rpc_scheme_base.h"
+#include "rpc_common.h"
+
#include <ydb/core/base/quoter.h>
#include <ydb/core/kesus/tablet/events.h>
-
-namespace NKikimr::NGRpcService {
-
-using namespace NActors;
-using namespace Ydb;
-using namespace NKesus;
-
-namespace {
-
-template <typename TDerived, typename TRequest>
-class TRateLimiterRequest : public TRpcOperationRequestActor<TDerived, TRequest> {
-public:
- using TBase = TRpcOperationRequestActor<TDerived, TRequest>;
-
+
+namespace NKikimr::NGRpcService {
+
+using namespace NActors;
+using namespace Ydb;
+using namespace NKesus;
+
+namespace {
+
+template <typename TDerived, typename TRequest>
+class TRateLimiterRequest : public TRpcOperationRequestActor<TDerived, TRequest> {
+public:
+ using TBase = TRpcOperationRequestActor<TDerived, TRequest>;
+
TRateLimiterRequest(IRequestOpCtx* msg, bool trusted = false)
: TBase(msg)
, TrustedZone(trusted)
{}
- bool ValidateResource(const Ydb::RateLimiter::Resource& resource, Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) {
- if (!ValidateResourcePath(resource.resource_path(), status, issues)) {
- return false;
- }
-
- if (resource.type_case() == Ydb::RateLimiter::Resource::TYPE_NOT_SET) {
- status = StatusIds::BAD_REQUEST;
- issues.AddIssue("No resource properties.");
- return false;
- }
-
- return true;
- }
-
- bool ValidateResourcePath(const TString& path, Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) {
- if (path != CanonizeQuoterResourcePath(path)) {
- status = StatusIds::BAD_REQUEST;
- issues.AddIssue("Bad resource path.");
- return false;
- }
- return true;
- }
+ bool ValidateResource(const Ydb::RateLimiter::Resource& resource, Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) {
+ if (!ValidateResourcePath(resource.resource_path(), status, issues)) {
+ return false;
+ }
+
+ if (resource.type_case() == Ydb::RateLimiter::Resource::TYPE_NOT_SET) {
+ status = StatusIds::BAD_REQUEST;
+ issues.AddIssue("No resource properties.");
+ return false;
+ }
+
+ return true;
+ }
+
+ bool ValidateResourcePath(const TString& path, Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) {
+ if (path != CanonizeQuoterResourcePath(path)) {
+ status = StatusIds::BAD_REQUEST;
+ issues.AddIssue("Bad resource path.");
+ return false;
+ }
+ return true;
+ }
bool ValidateCoordinationNodePath(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) {
auto databaseName = this->Request_->GetDatabaseName()
@@ -68,456 +68,456 @@ protected:
private:
const bool TrustedZone;
};
-
-template <class TEvRequest>
-class TRateLimiterControlRequest : public TRateLimiterRequest<TRateLimiterControlRequest<TEvRequest>, TEvRequest> {
-public:
- using TBase = TRateLimiterRequest<TRateLimiterControlRequest<TEvRequest>, TEvRequest>;
- using TBase::TBase;
-
- void Bootstrap(const TActorContext& ctx) {
- TBase::Bootstrap(ctx);
-
- this->Become(&TRateLimiterControlRequest::StateFunc);
-
- Ydb::StatusIds::StatusCode status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
- NYql::TIssues issues;
+
+template <class TEvRequest>
+class TRateLimiterControlRequest : public TRateLimiterRequest<TRateLimiterControlRequest<TEvRequest>, TEvRequest> {
+public:
+ using TBase = TRateLimiterRequest<TRateLimiterControlRequest<TEvRequest>, TEvRequest>;
+ using TBase::TBase;
+
+ void Bootstrap(const TActorContext& ctx) {
+ TBase::Bootstrap(ctx);
+
+ this->Become(&TRateLimiterControlRequest::StateFunc);
+
+ Ydb::StatusIds::StatusCode status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
+ NYql::TIssues issues;
if (!this->ValidateCoordinationNodePath(status, issues)) {
this->Reply(status, issues, TActivationContext::ActorContextFor(this->SelfId()));
return;
}
- if (!ValidateRequest(status, issues)) {
- this->Reply(status, issues, TActivationContext::ActorContextFor(this->SelfId()));
- return;
- }
-
- ResolveCoordinationPath();
- }
-
-protected:
- STFUNC(StateFunc) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
- hFunc(TEvTabletPipe::TEvClientConnected, Handle);
- hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
- default:
- return TBase::StateFuncBase(ev, ctx);
- }
- }
-
- void ResolveCoordinationPath() {
+ if (!ValidateRequest(status, issues)) {
+ this->Reply(status, issues, TActivationContext::ActorContextFor(this->SelfId()));
+ return;
+ }
+
+ ResolveCoordinationPath();
+ }
+
+protected:
+ STFUNC(StateFunc) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
+ hFunc(TEvTabletPipe::TEvClientConnected, Handle);
+ hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ default:
+ return TBase::StateFuncBase(ev, ctx);
+ }
+ }
+
+ void ResolveCoordinationPath() {
TVector<TString> path = NKikimr::SplitPath(this->GetCoordinationNodePath());
- if (path.empty()) {
- this->Reply(StatusIds::BAD_REQUEST, "Empty path.", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TActivationContext::ActorContextFor(this->SelfId()));
- return;
- }
-
- auto req = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
- req->ResultSet.emplace_back();
- req->ResultSet.back().Path.swap(path);
- req->ResultSet.back().Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
- this->Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(req), 0, 0);
- }
-
- void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
- THolder<NSchemeCache::TSchemeCacheNavigate> navigate = std::move(ev->Get()->Request);
- if (navigate->ResultSet.size() != 1 || navigate->ErrorCount > 0) {
- this->Reply(StatusIds::INTERNAL_ERROR, TActivationContext::ActorContextFor(this->SelfId()));
- return;
- }
-
- const auto& entry = navigate->ResultSet.front();
- if (entry.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
- this->Reply(StatusIds::SCHEME_ERROR, TActivationContext::ActorContextFor(this->SelfId()));
- return;
- }
-
- if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindKesus) {
- this->Reply(StatusIds::BAD_REQUEST, "Path is not a coordination node path.", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TActivationContext::ActorContextFor(this->SelfId()));
- return;
- }
-
- if (!entry.KesusInfo) {
- this->Reply(StatusIds::INTERNAL_ERROR, "Internal error: no coordination node info found.", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TActivationContext::ActorContextFor(this->SelfId()));
- return;
- }
-
- KesusTabletId = entry.KesusInfo->Description.GetKesusTabletId();
-
- if (!KesusTabletId) {
- this->Reply(StatusIds::INTERNAL_ERROR, "Internal error: no coordination node id found.", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TActivationContext::ActorContextFor(this->SelfId()));
- return;
- }
-
- CreatePipe();
-
- SendRequest();
- }
-
- NTabletPipe::TClientConfig GetPipeConfig() {
- NTabletPipe::TClientConfig cfg;
+ if (path.empty()) {
+ this->Reply(StatusIds::BAD_REQUEST, "Empty path.", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TActivationContext::ActorContextFor(this->SelfId()));
+ return;
+ }
+
+ auto req = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
+ req->ResultSet.emplace_back();
+ req->ResultSet.back().Path.swap(path);
+ req->ResultSet.back().Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
+ this->Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(req), 0, 0);
+ }
+
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ THolder<NSchemeCache::TSchemeCacheNavigate> navigate = std::move(ev->Get()->Request);
+ if (navigate->ResultSet.size() != 1 || navigate->ErrorCount > 0) {
+ this->Reply(StatusIds::INTERNAL_ERROR, TActivationContext::ActorContextFor(this->SelfId()));
+ return;
+ }
+
+ const auto& entry = navigate->ResultSet.front();
+ if (entry.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
+ this->Reply(StatusIds::SCHEME_ERROR, TActivationContext::ActorContextFor(this->SelfId()));
+ return;
+ }
+
+ if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindKesus) {
+ this->Reply(StatusIds::BAD_REQUEST, "Path is not a coordination node path.", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TActivationContext::ActorContextFor(this->SelfId()));
+ return;
+ }
+
+ if (!entry.KesusInfo) {
+ this->Reply(StatusIds::INTERNAL_ERROR, "Internal error: no coordination node info found.", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TActivationContext::ActorContextFor(this->SelfId()));
+ return;
+ }
+
+ KesusTabletId = entry.KesusInfo->Description.GetKesusTabletId();
+
+ if (!KesusTabletId) {
+ this->Reply(StatusIds::INTERNAL_ERROR, "Internal error: no coordination node id found.", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TActivationContext::ActorContextFor(this->SelfId()));
+ return;
+ }
+
+ CreatePipe();
+
+ SendRequest();
+ }
+
+ NTabletPipe::TClientConfig GetPipeConfig() {
+ NTabletPipe::TClientConfig cfg;
cfg.RetryPolicy = {
.RetryLimitCount = 3u
};
- return cfg;
- }
-
- void CreatePipe() {
- KesusPipeClient = this->Register(NTabletPipe::CreateClient(this->SelfId(), KesusTabletId, GetPipeConfig()));
- }
-
- void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
- if (ev->Get()->Status != NKikimrProto::OK) {
- this->Reply(StatusIds::UNAVAILABLE, "Failed to connect to coordination node.", NKikimrIssues::TIssuesIds::SHARD_NOT_AVAILABLE, TActivationContext::ActorContextFor(this->SelfId()));
- }
- }
-
- void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) {
- this->Reply(StatusIds::UNAVAILABLE, "Connection to coordination node was lost.", NKikimrIssues::TIssuesIds::SHARD_NOT_AVAILABLE, TActivationContext::ActorContextFor(this->SelfId()));
- }
-
- void ReplyFromKesusError(const NKikimrKesus::TKesusError& err) {
- this->Reply(err.GetStatus(), err.GetIssues(), TActivationContext::ActorContextFor(this->SelfId()));
- }
-
- virtual bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) = 0;
-
- virtual void SendRequest() = 0;
-
- void PassAway() override {
- if (KesusPipeClient) {
- NTabletPipe::CloseClient(this->SelfId(), KesusPipeClient);
- KesusPipeClient = {};
- }
- TBase::PassAway();
- }
-
-protected:
- ui64 KesusTabletId = 0;
- TActorId KesusPipeClient;
-};
-
-static void CopyProps(const Ydb::RateLimiter::Resource& src, NKikimrKesus::TStreamingQuoterResource& dst) {
- dst.SetResourcePath(src.resource_path());
- const auto& srcProps = src.hierarchical_drr();
- auto& props = *dst.MutableHierarhicalDRRResourceConfig();
- props.SetMaxUnitsPerSecond(srcProps.max_units_per_second());
- props.SetMaxBurstSizeCoefficient(srcProps.max_burst_size_coefficient());
+ return cfg;
+ }
+
+ void CreatePipe() {
+ KesusPipeClient = this->Register(NTabletPipe::CreateClient(this->SelfId(), KesusTabletId, GetPipeConfig()));
+ }
+
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
+ if (ev->Get()->Status != NKikimrProto::OK) {
+ this->Reply(StatusIds::UNAVAILABLE, "Failed to connect to coordination node.", NKikimrIssues::TIssuesIds::SHARD_NOT_AVAILABLE, TActivationContext::ActorContextFor(this->SelfId()));
+ }
+ }
+
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) {
+ this->Reply(StatusIds::UNAVAILABLE, "Connection to coordination node was lost.", NKikimrIssues::TIssuesIds::SHARD_NOT_AVAILABLE, TActivationContext::ActorContextFor(this->SelfId()));
+ }
+
+ void ReplyFromKesusError(const NKikimrKesus::TKesusError& err) {
+ this->Reply(err.GetStatus(), err.GetIssues(), TActivationContext::ActorContextFor(this->SelfId()));
+ }
+
+ virtual bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) = 0;
+
+ virtual void SendRequest() = 0;
+
+ void PassAway() override {
+ if (KesusPipeClient) {
+ NTabletPipe::CloseClient(this->SelfId(), KesusPipeClient);
+ KesusPipeClient = {};
+ }
+ TBase::PassAway();
+ }
+
+protected:
+ ui64 KesusTabletId = 0;
+ TActorId KesusPipeClient;
+};
+
+static void CopyProps(const Ydb::RateLimiter::Resource& src, NKikimrKesus::TStreamingQuoterResource& dst) {
+ dst.SetResourcePath(src.resource_path());
+ const auto& srcProps = src.hierarchical_drr();
+ auto& props = *dst.MutableHierarhicalDRRResourceConfig();
+ props.SetMaxUnitsPerSecond(srcProps.max_units_per_second());
+ props.SetMaxBurstSizeCoefficient(srcProps.max_burst_size_coefficient());
props.SetPrefetchCoefficient(srcProps.prefetch_coefficient());
props.SetPrefetchWatermark(srcProps.prefetch_watermark());
-}
-
-static void CopyProps(const NKikimrKesus::TStreamingQuoterResource& src, Ydb::RateLimiter::Resource& dst) {
- dst.set_resource_path(src.GetResourcePath());
- const auto& srcProps = src.GetHierarhicalDRRResourceConfig();
- auto& props = *dst.mutable_hierarchical_drr();
- props.set_max_units_per_second(srcProps.GetMaxUnitsPerSecond());
- props.set_max_burst_size_coefficient(srcProps.GetMaxBurstSizeCoefficient());
+}
+
+static void CopyProps(const NKikimrKesus::TStreamingQuoterResource& src, Ydb::RateLimiter::Resource& dst) {
+ dst.set_resource_path(src.GetResourcePath());
+ const auto& srcProps = src.GetHierarhicalDRRResourceConfig();
+ auto& props = *dst.mutable_hierarchical_drr();
+ props.set_max_units_per_second(srcProps.GetMaxUnitsPerSecond());
+ props.set_max_burst_size_coefficient(srcProps.GetMaxBurstSizeCoefficient());
props.set_prefetch_coefficient(srcProps.GetPrefetchCoefficient());
props.set_prefetch_watermark(srcProps.GetPrefetchWatermark());
-}
-
-class TCreateRateLimiterResourceRPC : public TRateLimiterControlRequest<TEvCreateRateLimiterResource> {
-public:
- using TBase = TRateLimiterControlRequest<TEvCreateRateLimiterResource>;
- using TBase::TBase;
- using TBase::Handle;
-
-
- STFUNC(StateFunc) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvKesus::TEvAddQuoterResourceResult, Handle);
- default:
- return TBase::StateFunc(ev, ctx);
- }
- }
-
- bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override {
+}
+
+class TCreateRateLimiterResourceRPC : public TRateLimiterControlRequest<TEvCreateRateLimiterResource> {
+public:
+ using TBase = TRateLimiterControlRequest<TEvCreateRateLimiterResource>;
+ using TBase::TBase;
+ using TBase::Handle;
+
+
+ STFUNC(StateFunc) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvKesus::TEvAddQuoterResourceResult, Handle);
+ default:
+ return TBase::StateFunc(ev, ctx);
+ }
+ }
+
+ bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override {
return ValidateResource(GetProtoRequest()->resource(), status, issues);
- }
-
- void SendRequest() override {
- Become(&TCreateRateLimiterResourceRPC::StateFunc);
-
- THolder<TEvKesus::TEvAddQuoterResource> req = MakeHolder<TEvKesus::TEvAddQuoterResource>();
+ }
+
+ void SendRequest() override {
+ Become(&TCreateRateLimiterResourceRPC::StateFunc);
+
+ THolder<TEvKesus::TEvAddQuoterResource> req = MakeHolder<TEvKesus::TEvAddQuoterResource>();
CopyProps(GetProtoRequest()->resource(), *req->Record.MutableResource());
- NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0);
- }
-
- void Handle(TEvKesus::TEvAddQuoterResourceResult::TPtr& ev) {
- ReplyFromKesusError(ev->Get()->Record.GetError());
- }
-};
-
-class TAlterRateLimiterResourceRPC : public TRateLimiterControlRequest<TEvAlterRateLimiterResource> {
-public:
- using TBase = TRateLimiterControlRequest<TEvAlterRateLimiterResource>;
- using TBase::TBase;
- using TBase::Handle;
-
- STFUNC(StateFunc) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvKesus::TEvUpdateQuoterResourceResult, Handle);
- default:
- return TBase::StateFunc(ev, ctx);
- }
- }
-
- bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override {
+ NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0);
+ }
+
+ void Handle(TEvKesus::TEvAddQuoterResourceResult::TPtr& ev) {
+ ReplyFromKesusError(ev->Get()->Record.GetError());
+ }
+};
+
+class TAlterRateLimiterResourceRPC : public TRateLimiterControlRequest<TEvAlterRateLimiterResource> {
+public:
+ using TBase = TRateLimiterControlRequest<TEvAlterRateLimiterResource>;
+ using TBase::TBase;
+ using TBase::Handle;
+
+ STFUNC(StateFunc) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvKesus::TEvUpdateQuoterResourceResult, Handle);
+ default:
+ return TBase::StateFunc(ev, ctx);
+ }
+ }
+
+ bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override {
return ValidateResource(GetProtoRequest()->resource(), status, issues);
- }
-
- void SendRequest() override {
- Become(&TAlterRateLimiterResourceRPC::StateFunc);
-
- THolder<TEvKesus::TEvUpdateQuoterResource> req = MakeHolder<TEvKesus::TEvUpdateQuoterResource>();
+ }
+
+ void SendRequest() override {
+ Become(&TAlterRateLimiterResourceRPC::StateFunc);
+
+ THolder<TEvKesus::TEvUpdateQuoterResource> req = MakeHolder<TEvKesus::TEvUpdateQuoterResource>();
CopyProps(GetProtoRequest()->resource(), *req->Record.MutableResource());
- NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0);
- }
-
- void Handle(TEvKesus::TEvUpdateQuoterResourceResult::TPtr& ev) {
- ReplyFromKesusError(ev->Get()->Record.GetError());
- }
-};
-
-class TDropRateLimiterResourceRPC : public TRateLimiterControlRequest<TEvDropRateLimiterResource> {
-public:
- using TBase = TRateLimiterControlRequest<TEvDropRateLimiterResource>;
- using TBase::TBase;
- using TBase::Handle;
-
- STFUNC(StateFunc) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvKesus::TEvDeleteQuoterResourceResult, Handle);
- default:
- return TBase::StateFunc(ev, ctx);
- }
- }
-
- bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override {
+ NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0);
+ }
+
+ void Handle(TEvKesus::TEvUpdateQuoterResourceResult::TPtr& ev) {
+ ReplyFromKesusError(ev->Get()->Record.GetError());
+ }
+};
+
+class TDropRateLimiterResourceRPC : public TRateLimiterControlRequest<TEvDropRateLimiterResource> {
+public:
+ using TBase = TRateLimiterControlRequest<TEvDropRateLimiterResource>;
+ using TBase::TBase;
+ using TBase::Handle;
+
+ STFUNC(StateFunc) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvKesus::TEvDeleteQuoterResourceResult, Handle);
+ default:
+ return TBase::StateFunc(ev, ctx);
+ }
+ }
+
+ bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override {
return ValidateResourcePath(GetProtoRequest()->resource_path(), status, issues);
- }
-
- void SendRequest() override {
- Become(&TDropRateLimiterResourceRPC::StateFunc);
-
- THolder<TEvKesus::TEvDeleteQuoterResource> req = MakeHolder<TEvKesus::TEvDeleteQuoterResource>();
+ }
+
+ void SendRequest() override {
+ Become(&TDropRateLimiterResourceRPC::StateFunc);
+
+ THolder<TEvKesus::TEvDeleteQuoterResource> req = MakeHolder<TEvKesus::TEvDeleteQuoterResource>();
req->Record.SetResourcePath(GetProtoRequest()->resource_path());
- NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0);
- }
-
- void Handle(TEvKesus::TEvDeleteQuoterResourceResult::TPtr& ev) {
- ReplyFromKesusError(ev->Get()->Record.GetError());
- }
-};
-
-class TListRateLimiterResourcesRPC : public TRateLimiterControlRequest<TEvListRateLimiterResources> {
-public:
- using TBase = TRateLimiterControlRequest<TEvListRateLimiterResources>;
- using TBase::TBase;
- using TBase::Handle;
-
- STFUNC(StateFunc) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvKesus::TEvDescribeQuoterResourcesResult, Handle);
- default:
- return TBase::StateFunc(ev, ctx);
- }
- }
-
- bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override {
+ NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0);
+ }
+
+ void Handle(TEvKesus::TEvDeleteQuoterResourceResult::TPtr& ev) {
+ ReplyFromKesusError(ev->Get()->Record.GetError());
+ }
+};
+
+class TListRateLimiterResourcesRPC : public TRateLimiterControlRequest<TEvListRateLimiterResources> {
+public:
+ using TBase = TRateLimiterControlRequest<TEvListRateLimiterResources>;
+ using TBase::TBase;
+ using TBase::Handle;
+
+ STFUNC(StateFunc) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvKesus::TEvDescribeQuoterResourcesResult, Handle);
+ default:
+ return TBase::StateFunc(ev, ctx);
+ }
+ }
+
+ bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override {
if (const TString& path = GetProtoRequest()->resource_path()) {
- return ValidateResourcePath(path, status, issues);
- }
- return true;
- }
-
- void SendRequest() override {
- Become(&TListRateLimiterResourcesRPC::StateFunc);
-
- THolder<TEvKesus::TEvDescribeQuoterResources> req = MakeHolder<TEvKesus::TEvDescribeQuoterResources>();
+ return ValidateResourcePath(path, status, issues);
+ }
+ return true;
+ }
+
+ void SendRequest() override {
+ Become(&TListRateLimiterResourcesRPC::StateFunc);
+
+ THolder<TEvKesus::TEvDescribeQuoterResources> req = MakeHolder<TEvKesus::TEvDescribeQuoterResources>();
if (const TString& path = GetProtoRequest()->resource_path()) {
- req->Record.AddResourcePaths(path);
- }
+ req->Record.AddResourcePaths(path);
+ }
req->Record.SetRecursive(GetProtoRequest()->recursive());
- NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0);
- }
-
- void Handle(TEvKesus::TEvDescribeQuoterResourcesResult::TPtr& ev) {
- const NKikimrKesus::TKesusError& kesusError = ev->Get()->Record.GetError();
- if (kesusError.GetStatus() == Ydb::StatusIds::SUCCESS) {
- Ydb::RateLimiter::ListResourcesResult result;
- for (const auto& resource : ev->Get()->Record.GetResources()) {
- result.add_resource_paths(resource.GetResourcePath());
- }
- Request_->SendResult(result, Ydb::StatusIds::SUCCESS);
- PassAway();
- } else {
- ReplyFromKesusError(kesusError);
- }
- }
-};
-
-class TDescribeRateLimiterResourceRPC : public TRateLimiterControlRequest<TEvDescribeRateLimiterResource> {
-public:
- using TBase = TRateLimiterControlRequest<TEvDescribeRateLimiterResource>;
- using TBase::TBase;
- using TBase::Handle;
-
- STFUNC(StateFunc) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvKesus::TEvDescribeQuoterResourcesResult, Handle);
- default:
- return TBase::StateFunc(ev, ctx);
- }
- }
-
- bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override {
+ NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0);
+ }
+
+ void Handle(TEvKesus::TEvDescribeQuoterResourcesResult::TPtr& ev) {
+ const NKikimrKesus::TKesusError& kesusError = ev->Get()->Record.GetError();
+ if (kesusError.GetStatus() == Ydb::StatusIds::SUCCESS) {
+ Ydb::RateLimiter::ListResourcesResult result;
+ for (const auto& resource : ev->Get()->Record.GetResources()) {
+ result.add_resource_paths(resource.GetResourcePath());
+ }
+ Request_->SendResult(result, Ydb::StatusIds::SUCCESS);
+ PassAway();
+ } else {
+ ReplyFromKesusError(kesusError);
+ }
+ }
+};
+
+class TDescribeRateLimiterResourceRPC : public TRateLimiterControlRequest<TEvDescribeRateLimiterResource> {
+public:
+ using TBase = TRateLimiterControlRequest<TEvDescribeRateLimiterResource>;
+ using TBase::TBase;
+ using TBase::Handle;
+
+ STFUNC(StateFunc) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvKesus::TEvDescribeQuoterResourcesResult, Handle);
+ default:
+ return TBase::StateFunc(ev, ctx);
+ }
+ }
+
+ bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override {
return ValidateResourcePath(GetProtoRequest()->resource_path(), status, issues);
- }
-
- void SendRequest() override {
- Become(&TDescribeRateLimiterResourceRPC::StateFunc);
-
- THolder<TEvKesus::TEvDescribeQuoterResources> req = MakeHolder<TEvKesus::TEvDescribeQuoterResources>();
+ }
+
+ void SendRequest() override {
+ Become(&TDescribeRateLimiterResourceRPC::StateFunc);
+
+ THolder<TEvKesus::TEvDescribeQuoterResources> req = MakeHolder<TEvKesus::TEvDescribeQuoterResources>();
req->Record.AddResourcePaths(GetProtoRequest()->resource_path());
- NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0);
- }
-
- void Handle(TEvKesus::TEvDescribeQuoterResourcesResult::TPtr& ev) {
- const NKikimrKesus::TKesusError& kesusError = ev->Get()->Record.GetError();
- if (kesusError.GetStatus() == Ydb::StatusIds::SUCCESS) {
- Ydb::RateLimiter::DescribeResourceResult result;
- if (ev->Get()->Record.ResourcesSize() == 0) {
- this->Reply(StatusIds::INTERNAL_ERROR, "No resource properties found.", NKikimrIssues::TIssuesIds::DEFAULT_ERROR, TActivationContext::ActorContextFor(this->SelfId()));
- return;
- }
- CopyProps(ev->Get()->Record.GetResources(0), *result.mutable_resource());
- Request_->SendResult(result, Ydb::StatusIds::SUCCESS);
- PassAway();
- } else {
- ReplyFromKesusError(kesusError);
- }
- }
-};
-
-class TAcquireRateLimiterResourceRPC : public TRateLimiterRequest<TAcquireRateLimiterResourceRPC, TEvAcquireRateLimiterResource> {
-public:
- using TBase = TRateLimiterRequest<TAcquireRateLimiterResourceRPC, TEvAcquireRateLimiterResource>;
- using TBase::TBase;
-
- void Bootstrap(const TActorContext& ctx) {
- TBase::Bootstrap(ctx);
-
- Become(&TAcquireRateLimiterResourceRPC::StateFunc);
-
- Ydb::StatusIds::StatusCode status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
- NYql::TIssues issues;
+ NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0);
+ }
+
+ void Handle(TEvKesus::TEvDescribeQuoterResourcesResult::TPtr& ev) {
+ const NKikimrKesus::TKesusError& kesusError = ev->Get()->Record.GetError();
+ if (kesusError.GetStatus() == Ydb::StatusIds::SUCCESS) {
+ Ydb::RateLimiter::DescribeResourceResult result;
+ if (ev->Get()->Record.ResourcesSize() == 0) {
+ this->Reply(StatusIds::INTERNAL_ERROR, "No resource properties found.", NKikimrIssues::TIssuesIds::DEFAULT_ERROR, TActivationContext::ActorContextFor(this->SelfId()));
+ return;
+ }
+ CopyProps(ev->Get()->Record.GetResources(0), *result.mutable_resource());
+ Request_->SendResult(result, Ydb::StatusIds::SUCCESS);
+ PassAway();
+ } else {
+ ReplyFromKesusError(kesusError);
+ }
+ }
+};
+
+class TAcquireRateLimiterResourceRPC : public TRateLimiterRequest<TAcquireRateLimiterResourceRPC, TEvAcquireRateLimiterResource> {
+public:
+ using TBase = TRateLimiterRequest<TAcquireRateLimiterResourceRPC, TEvAcquireRateLimiterResource>;
+ using TBase::TBase;
+
+ void Bootstrap(const TActorContext& ctx) {
+ TBase::Bootstrap(ctx);
+
+ Become(&TAcquireRateLimiterResourceRPC::StateFunc);
+
+ Ydb::StatusIds::StatusCode status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
+ NYql::TIssues issues;
if (!ValidateCoordinationNodePath(status, issues)) {
Reply(status, issues, TActivationContext::AsActorContext());
return;
}
- if (!ValidateRequest(status, issues)) {
- Reply(status, issues, TActivationContext::AsActorContext());
- return;
- }
-
- SendRequest();
- }
-
- STFUNC(StateFunc) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvQuota::TEvClearance, Handle);
- default:
- return TBase::StateFuncBase(ev, ctx);
- }
- }
-
- bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) {
+ if (!ValidateRequest(status, issues)) {
+ Reply(status, issues, TActivationContext::AsActorContext());
+ return;
+ }
+
+ SendRequest();
+ }
+
+ STFUNC(StateFunc) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvQuota::TEvClearance, Handle);
+ default:
+ return TBase::StateFuncBase(ev, ctx);
+ }
+ }
+
+ bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) {
if (!ValidateResourcePath(GetProtoRequest()->resource_path(), status, issues)) {
- return false;
- }
-
+ return false;
+ }
+
if (GetProtoRequest()->units_case() == Ydb::RateLimiter::AcquireResourceRequest::UnitsCase::UNITS_NOT_SET) {
- return false;
- }
-
- return true;
- }
-
- void SendRequest() {
- Become(&TAcquireRateLimiterResourceRPC::StateFunc);
-
+ return false;
+ }
+
+ return true;
+ }
+
+ void SendRequest() {
+ Become(&TAcquireRateLimiterResourceRPC::StateFunc);
+
if (GetProtoRequest()->units_case() == Ydb::RateLimiter::AcquireResourceRequest::UnitsCase::kRequired) {
- SendLeaf(
+ SendLeaf(
TEvQuota::TResourceLeaf(GetProtoRequest()->coordination_node_path(),
GetProtoRequest()->resource_path(),
GetProtoRequest()->required()));
- return;
+ return;
}
-
- SendLeaf(
+
+ SendLeaf(
TEvQuota::TResourceLeaf(GetProtoRequest()->coordination_node_path(),
GetProtoRequest()->resource_path(),
GetProtoRequest()->used(),
- true));
- }
-
- void SendLeaf(const TEvQuota::TResourceLeaf& leaf) {
- Send(MakeQuoterServiceID(),
+ true));
+ }
+
+ void SendLeaf(const TEvQuota::TResourceLeaf& leaf) {
+ Send(MakeQuoterServiceID(),
new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, { leaf }, GetOperationTimeout()), 0, 0);
- }
-
- void Handle(TEvQuota::TEvClearance::TPtr& ev) {
- switch (ev->Get()->Result) {
- case TEvQuota::TEvClearance::EResult::Success:
- Reply(StatusIds::SUCCESS, TActivationContext::AsActorContext());
- break;
- case TEvQuota::TEvClearance::EResult::UnknownResource:
- Reply(StatusIds::BAD_REQUEST, TActivationContext::AsActorContext());
- break;
- case TEvQuota::TEvClearance::EResult::Deadline:
- Reply(StatusIds::TIMEOUT, TActivationContext::AsActorContext());
- break;
- default:
- Reply(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext());
- }
- }
-};
-
-} // namespace
-
-void TGRpcRequestProxy::Handle(TEvCreateRateLimiterResource::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TCreateRateLimiterResourceRPC(ev->Release().Release()));
-}
-
-void TGRpcRequestProxy::Handle(TEvAlterRateLimiterResource::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TAlterRateLimiterResourceRPC(ev->Release().Release()));
-}
-
-void TGRpcRequestProxy::Handle(TEvDropRateLimiterResource::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TDropRateLimiterResourceRPC(ev->Release().Release()));
-}
-
-void TGRpcRequestProxy::Handle(TEvListRateLimiterResources::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TListRateLimiterResourcesRPC(ev->Release().Release()));
-}
-
-void TGRpcRequestProxy::Handle(TEvDescribeRateLimiterResource::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TDescribeRateLimiterResourceRPC(ev->Release().Release()));
-}
-
-void TGRpcRequestProxy::Handle(TEvAcquireRateLimiterResource::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TAcquireRateLimiterResourceRPC(ev->Release().Release()));
-}
-
+ }
+
+ void Handle(TEvQuota::TEvClearance::TPtr& ev) {
+ switch (ev->Get()->Result) {
+ case TEvQuota::TEvClearance::EResult::Success:
+ Reply(StatusIds::SUCCESS, TActivationContext::AsActorContext());
+ break;
+ case TEvQuota::TEvClearance::EResult::UnknownResource:
+ Reply(StatusIds::BAD_REQUEST, TActivationContext::AsActorContext());
+ break;
+ case TEvQuota::TEvClearance::EResult::Deadline:
+ Reply(StatusIds::TIMEOUT, TActivationContext::AsActorContext());
+ break;
+ default:
+ Reply(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext());
+ }
+ }
+};
+
+} // namespace
+
+void TGRpcRequestProxy::Handle(TEvCreateRateLimiterResource::TPtr& ev, const TActorContext& ctx) {
+ ctx.Register(new TCreateRateLimiterResourceRPC(ev->Release().Release()));
+}
+
+void TGRpcRequestProxy::Handle(TEvAlterRateLimiterResource::TPtr& ev, const TActorContext& ctx) {
+ ctx.Register(new TAlterRateLimiterResourceRPC(ev->Release().Release()));
+}
+
+void TGRpcRequestProxy::Handle(TEvDropRateLimiterResource::TPtr& ev, const TActorContext& ctx) {
+ ctx.Register(new TDropRateLimiterResourceRPC(ev->Release().Release()));
+}
+
+void TGRpcRequestProxy::Handle(TEvListRateLimiterResources::TPtr& ev, const TActorContext& ctx) {
+ ctx.Register(new TListRateLimiterResourcesRPC(ev->Release().Release()));
+}
+
+void TGRpcRequestProxy::Handle(TEvDescribeRateLimiterResource::TPtr& ev, const TActorContext& ctx) {
+ ctx.Register(new TDescribeRateLimiterResourceRPC(ev->Release().Release()));
+}
+
+void TGRpcRequestProxy::Handle(TEvAcquireRateLimiterResource::TPtr& ev, const TActorContext& ctx) {
+ ctx.Register(new TAcquireRateLimiterResourceRPC(ev->Release().Release()));
+}
+
template<>
IActor* TEvAcquireRateLimiterResource::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) {
return new TAcquireRateLimiterResourceRPC(msg, true);
}
-} // namespace NKikimr::NGRpcService
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make
index 05b156bf043..7ae3d5cb2d9 100644
--- a/ydb/core/grpc_services/ya.make
+++ b/ydb/core/grpc_services/ya.make
@@ -58,7 +58,7 @@ SRCS(
rpc_modify_permissions.cpp
rpc_monitoring.cpp
rpc_prepare_data_query.cpp
- rpc_rate_limiter_api.cpp
+ rpc_rate_limiter_api.cpp
rpc_read_columns.cpp
rpc_read_table.cpp
rpc_remove_directory.cpp
diff --git a/ydb/core/quoter/probes.h b/ydb/core/quoter/probes.h
index c5ef9aca125..ee9c14f6eb9 100644
--- a/ydb/core/quoter/probes.h
+++ b/ydb/core/quoter/probes.h
@@ -24,9 +24,9 @@
PROBE(StartCharging, GROUPS("QuoterService", "ClientRequest"), \
TYPES(TString, TString, ui64, ui64), \
NAMES("quoter", "resource", "quoterId", "resourceId")) \
- PROBE(Charge, GROUPS("QuoterService", "ClientRequest"), \
- TYPES(TString, TString, ui64, ui64), \
- NAMES("quoter", "resource", "quoterId", "resourceId")) \
+ PROBE(Charge, GROUPS("QuoterService", "ClientRequest"), \
+ TYPES(TString, TString, ui64, ui64), \
+ NAMES("quoter", "resource", "quoterId", "resourceId")) \
\
PROBE(AllocateResource, GROUPS("QuoterService", "Resource"), \
TYPES(TString, TString, ui64, ui64, ui64, double, ui64, double, double, double), \
diff --git a/ydb/core/quoter/quoter_service.cpp b/ydb/core/quoter/quoter_service.cpp
index b9a1b629f51..dea19f4fd9e 100644
--- a/ydb/core/quoter/quoter_service.cpp
+++ b/ydb/core/quoter/quoter_service.cpp
@@ -147,7 +147,7 @@ TResourceLeaf& TResState::Get(ui32 idx) {
return Leafs[idx];
}
-ui32 TResState::Allocate(TResource *resource, ui64 amount, bool isUsedAmount, ui32 requestIdx) {
+ui32 TResState::Allocate(TResource *resource, ui64 amount, bool isUsedAmount, ui32 requestIdx) {
ui32 idx;
if (Unused) {
idx = Unused.back();
@@ -160,7 +160,7 @@ ui32 TResState::Allocate(TResource *resource, ui64 amount, bool isUsedAmount, ui
auto &x = Leafs[idx];
x.Resource = resource;
x.Amount = amount;
- x.IsUsedAmount = isUsedAmount;
+ x.IsUsedAmount = isUsedAmount;
x.RequestIdx = requestIdx;
Y_VERIFY_DEBUG(x.NextInWaitQueue == Max<ui32>());
@@ -232,12 +232,12 @@ void TResource::StopStarvation(TInstant now) {
TDuration TResource::Charge(TRequest& request, TResourceLeaf& leaf, TInstant now) {
MarkStartedCharging(request, leaf, now);
- if (leaf.IsUsedAmount) {
- ChargeUsedAmount(leaf.Amount, now);
- Counters.RequestTime->Collect((now - request.StartTime).MilliSeconds());
- return TDuration::Zero();
- }
-
+ if (leaf.IsUsedAmount) {
+ ChargeUsedAmount(leaf.Amount, now);
+ Counters.RequestTime->Collect((now - request.StartTime).MilliSeconds());
+ return TDuration::Zero();
+ }
+
const TDuration result = Charge(leaf.Amount, now);
if (result == TDuration::Zero()) {
Counters.RequestTime->Collect((now - request.StartTime).MilliSeconds());
@@ -245,24 +245,24 @@ TDuration TResource::Charge(TRequest& request, TResourceLeaf& leaf, TInstant now
return result;
}
-void TResource::ChargeUsedAmount(double amount, TInstant now) {
- BLOG_T("ChargeUsedAmount \"" << Resource << "\" for " << amount
- << ". Balance: " << Balance
- << ". FreeBalance: " << FreeBalance
- << ". Now: " << now);
- LastAllocated = now;
- FreeBalance -= amount;
- Balance -= amount;
- AmountConsumed += amount;
+void TResource::ChargeUsedAmount(double amount, TInstant now) {
+ BLOG_T("ChargeUsedAmount \"" << Resource << "\" for " << amount
+ << ". Balance: " << Balance
+ << ". FreeBalance: " << FreeBalance
+ << ". Now: " << now);
+ LastAllocated = now;
+ FreeBalance -= amount;
+ Balance -= amount;
+ AmountConsumed += amount;
History.Add(now, amount);
- Counters.Consumed->Add(static_cast<i64>(amount));
- if (Balance >= 0.0) {
- StopStarvation(now);
- return;
- }
- StartStarvation(now);
-}
-
+ Counters.Consumed->Add(static_cast<i64>(amount));
+ if (Balance >= 0.0) {
+ StopStarvation(now);
+ return;
+ }
+ StartStarvation(now);
+}
+
TDuration TResource::Charge(double amount, TInstant now) {
// Zero - charged
// Max - not in current tick (or resource already queued)
@@ -587,7 +587,7 @@ TQuoterService::EInitLeafStatus TQuoterService::InitResourceLeaf(const TEvQuota:
quoter->WaitingQueueResolve.emplace(reqIdx);
// todo: make generic 'leaf for resolve' helper
- const ui32 resLeafIdx = ResState.Allocate(nullptr, leaf.Amount, leaf.IsUsedAmount, reqIdx);
+ const ui32 resLeafIdx = ResState.Allocate(nullptr, leaf.Amount, leaf.IsUsedAmount, reqIdx);
TResourceLeaf& resLeaf = ResState.Get(resLeafIdx);
resLeaf.QuoterId = quoterId;
@@ -618,7 +618,7 @@ TQuoterService::EInitLeafStatus TQuoterService::InitResourceLeaf(const TEvQuota:
auto rIndxIt = quoter->WaitingResource.emplace(leaf.Resource, TSet<ui32>());
rIndxIt.first->second.emplace(reqIdx);
- const ui32 resLeafIdx = ResState.Allocate(nullptr, leaf.Amount, leaf.IsUsedAmount, reqIdx);
+ const ui32 resLeafIdx = ResState.Allocate(nullptr, leaf.Amount, leaf.IsUsedAmount, reqIdx);
TResourceLeaf& resLeaf = ResState.Get(resLeafIdx);
resLeaf.QuoterId = quoterId;
@@ -693,13 +693,13 @@ TQuoterService::EInitLeafStatus TQuoterService::TryCharge(TResource& quores, ui6
const TInstant now = TActivationContext::Now();
bool startedCharge = false;
LWTRACK(ResourceQueueState, request.Orbit, leaf.Quoter, leaf.Resource, leaf.QuoterId, leaf.ResourceId, quores.QueueSize, quores.QueueWeight);
- if (leaf.IsUsedAmount) {
- quores.ChargeUsedAmount(leaf.Amount, now);
- LWTRACK(Charge, request.Orbit, leaf.Quoter, leaf.Resource, leaf.QuoterId, leaf.ResourceId);
- quores.Counters.RequestTime->Collect((now - request.StartTime).MilliSeconds());
- return EInitLeafStatus::Charged;
- }
-
+ if (leaf.IsUsedAmount) {
+ quores.ChargeUsedAmount(leaf.Amount, now);
+ LWTRACK(Charge, request.Orbit, leaf.Quoter, leaf.Resource, leaf.QuoterId, leaf.ResourceId);
+ quores.Counters.RequestTime->Collect((now - request.StartTime).MilliSeconds());
+ return EInitLeafStatus::Charged;
+ }
+
if (quores.QueueSize == 0) {
startedCharge = true;
const TDuration delay = quores.Charge(leaf.Amount, now);
@@ -714,7 +714,7 @@ TQuoterService::EInitLeafStatus TQuoterService::TryCharge(TResource& quores, ui6
}
// need wait entry for resource
- const ui32 resLeafIdx = ResState.Allocate(&quores, leaf.Amount, leaf.IsUsedAmount, reqIdx);
+ const ui32 resLeafIdx = ResState.Allocate(&quores, leaf.Amount, leaf.IsUsedAmount, reqIdx);
TResourceLeaf& resLeaf = ResState.Get(resLeafIdx);
resLeaf.State = EResourceState::Wait;
diff --git a/ydb/core/quoter/quoter_service_impl.h b/ydb/core/quoter/quoter_service_impl.h
index d27ea134e19..114ea2c091b 100644
--- a/ydb/core/quoter/quoter_service_impl.h
+++ b/ydb/core/quoter/quoter_service_impl.h
@@ -86,7 +86,7 @@ enum class EResourceState {
struct TResourceLeaf {
TResource *Resource = nullptr;
ui64 Amount = Max<ui64>();
- bool IsUsedAmount = false;
+ bool IsUsedAmount = false;
ui32 RequestIdx = Max<ui32>();
@@ -113,7 +113,7 @@ private:
TVector<ui32> Unused;
public:
TResourceLeaf& Get(ui32 idx);
- ui32 Allocate(TResource *resource, ui64 amount, bool isUsedAmount, ui32 requestIdx);
+ ui32 Allocate(TResource *resource, ui64 amount, bool isUsedAmount, ui32 requestIdx);
void FreeChain(ui32 headIdx);
};
@@ -275,7 +275,7 @@ class TQuoterService : public TActorBootstrapped<TQuoterService> {
EInitLeafStatus InitSystemLeaf(const TEvQuota::TResourceLeaf &leaf, TRequest &request, ui32 reqIdx);
EInitLeafStatus InitResourceLeaf(const TEvQuota::TResourceLeaf &leaf, TRequest &request, ui32 reqIdx);
EInitLeafStatus TryCharge(TResource& quores, ui64 quoterId, ui64 resourceId, const TEvQuota::TResourceLeaf &leaf, TRequest &request, ui32 reqIdx);
- EInitLeafStatus NotifyUsed(TResource& quores, ui64 quoterId, ui64 resourceId, const TEvQuota::TResourceLeaf &leaf, TRequest &request);
+ EInitLeafStatus NotifyUsed(TResource& quores, ui64 quoterId, ui64 resourceId, const TEvQuota::TResourceLeaf &leaf, TRequest &request);
void MarkScheduleAllocation(TResource& quores, TDuration delay, TInstant now);
void InitialRequestProcessing(TEvQuota::TEvRequest::TPtr &ev, const ui32 reqIdx);
diff --git a/ydb/core/quoter/quoter_service_ut.cpp b/ydb/core/quoter/quoter_service_ut.cpp
index 88c74ca3fb1..f169bb6a074 100644
--- a/ydb/core/quoter/quoter_service_ut.cpp
+++ b/ydb/core/quoter/quoter_service_ut.cpp
@@ -71,42 +71,42 @@ Y_UNIT_TEST_SUITE(TQuoterServiceTest) {
UNIT_ASSERT_VALUES_EQUAL(ev->Cookie, 300 + i);
}
}
-
- {
- auto resId = TEvQuota::TResourceLeaf::MakeTaggedRateRes(1, 1);
-
- runtime->Send(new IEventHandle(MakeQuoterServiceID(), sender,
- new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, {
- TEvQuota::TResourceLeaf(TEvQuota::TResourceLeaf::QuoterSystem, resId, 1)
- }, TDuration::Max())));
-
- THolder<TEvQuota::TEvClearance> reply = runtime->GrabEdgeEvent<TEvQuota::TEvClearance>();
+
+ {
+ auto resId = TEvQuota::TResourceLeaf::MakeTaggedRateRes(1, 1);
+
+ runtime->Send(new IEventHandle(MakeQuoterServiceID(), sender,
+ new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, {
+ TEvQuota::TResourceLeaf(TEvQuota::TResourceLeaf::QuoterSystem, resId, 1)
+ }, TDuration::Max())));
+
+ THolder<TEvQuota::TEvClearance> reply = runtime->GrabEdgeEvent<TEvQuota::TEvClearance>();
UNIT_ASSERT(reply->Result == TEvQuota::TEvClearance::EResult::Success);
-
- runtime->Send(new IEventHandle(MakeQuoterServiceID(), sender,
- new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, {
- TEvQuota::TResourceLeaf(TEvQuota::TResourceLeaf::QuoterSystem, resId, 1000, true)
- }, TDuration::Seconds(1))));
-
- reply = runtime->GrabEdgeEvent<TEvQuota::TEvClearance>();
+
+ runtime->Send(new IEventHandle(MakeQuoterServiceID(), sender,
+ new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, {
+ TEvQuota::TResourceLeaf(TEvQuota::TResourceLeaf::QuoterSystem, resId, 1000, true)
+ }, TDuration::Seconds(1))));
+
+ reply = runtime->GrabEdgeEvent<TEvQuota::TEvClearance>();
+ UNIT_ASSERT(reply->Result == TEvQuota::TEvClearance::EResult::Success);
+
+ runtime->Send(new IEventHandle(MakeQuoterServiceID(), sender,
+ new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, {
+ TEvQuota::TResourceLeaf(TEvQuota::TResourceLeaf::QuoterSystem, resId, 1000)
+ }, TDuration::Seconds(1))));
+
+ reply = runtime->GrabEdgeEvent<TEvQuota::TEvClearance>();
+ UNIT_ASSERT(reply->Result == TEvQuota::TEvClearance::EResult::Deadline);
+
+ runtime->Send(new IEventHandle(MakeQuoterServiceID(), sender,
+ new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, {
+ TEvQuota::TResourceLeaf(TEvQuota::TResourceLeaf::QuoterSystem, resId, 1000, true)
+ }, TDuration::Seconds(1))));
+
+ reply = runtime->GrabEdgeEvent<TEvQuota::TEvClearance>();
UNIT_ASSERT(reply->Result == TEvQuota::TEvClearance::EResult::Success);
-
- runtime->Send(new IEventHandle(MakeQuoterServiceID(), sender,
- new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, {
- TEvQuota::TResourceLeaf(TEvQuota::TResourceLeaf::QuoterSystem, resId, 1000)
- }, TDuration::Seconds(1))));
-
- reply = runtime->GrabEdgeEvent<TEvQuota::TEvClearance>();
- UNIT_ASSERT(reply->Result == TEvQuota::TEvClearance::EResult::Deadline);
-
- runtime->Send(new IEventHandle(MakeQuoterServiceID(), sender,
- new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, {
- TEvQuota::TResourceLeaf(TEvQuota::TResourceLeaf::QuoterSystem, resId, 1000, true)
- }, TDuration::Seconds(1))));
-
- reply = runtime->GrabEdgeEvent<TEvQuota::TEvClearance>();
- UNIT_ASSERT(reply->Result == TEvQuota::TEvClearance::EResult::Success);
- }
+ }
}
#if defined(OPTIMIZED)
diff --git a/ydb/public/api/grpc/ydb_rate_limiter_v1.proto b/ydb/public/api/grpc/ydb_rate_limiter_v1.proto
index be477d1dd59..b8431847285 100644
--- a/ydb/public/api/grpc/ydb_rate_limiter_v1.proto
+++ b/ydb/public/api/grpc/ydb_rate_limiter_v1.proto
@@ -29,7 +29,7 @@ service RateLimiterService {
// Describe properties of resource in coordination node.
rpc DescribeResource(DescribeResourceRequest) returns (DescribeResourceResponse);
-
- // Take units for usage of a resource in coordination node.
- rpc AcquireResource(AcquireResourceRequest) returns (AcquireResourceResponse);
+
+ // Take units for usage of a resource in coordination node.
+ rpc AcquireResource(AcquireResourceRequest) returns (AcquireResourceResponse);
}
diff --git a/ydb/public/api/protos/ydb_rate_limiter.proto b/ydb/public/api/protos/ydb_rate_limiter.proto
index c18430c32b6..d44e3b5f966 100644
--- a/ydb/public/api/protos/ydb_rate_limiter.proto
+++ b/ydb/public/api/protos/ydb_rate_limiter.proto
@@ -175,33 +175,33 @@ message DescribeResourceResponse {
message DescribeResourceResult {
Resource resource = 1;
}
-
-//
-// AcquireResource method.
-//
-
-message AcquireResourceRequest {
- Ydb.Operations.OperationParams operation_params = 1;
-
- // Path of a coordination node.
- string coordination_node_path = 2;
-
- // Path of resource inside a coordination node.
- string resource_path = 3;
-
- oneof units {
- // Request resource's units for usage.
- uint64 required = 4;
-
- // Actually used resource's units by client.
- uint64 used = 5;
- }
-}
-
-message AcquireResourceResponse {
- // Holds AcquireResourceResult in case of successful call.
- Ydb.Operations.Operation operation = 1;
-}
-
-message AcquireResourceResult {
-}
+
+//
+// AcquireResource method.
+//
+
+message AcquireResourceRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+
+ // Path of a coordination node.
+ string coordination_node_path = 2;
+
+ // Path of resource inside a coordination node.
+ string resource_path = 3;
+
+ oneof units {
+ // Request resource's units for usage.
+ uint64 required = 4;
+
+ // Actually used resource's units by client.
+ uint64 used = 5;
+ }
+}
+
+message AcquireResourceResponse {
+ // Holds AcquireResourceResult in case of successful call.
+ Ydb.Operations.Operation operation = 1;
+}
+
+message AcquireResourceResult {
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.cpp b/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.cpp
index bea557812c6..23e8942221c 100644
--- a/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.cpp
@@ -169,23 +169,23 @@ public:
return promise.GetFuture();
}
-
- TAsyncStatus AcquireResource(const TString& coordinationNodePath, const TString& resourcePath, const TAcquireResourceSettings& settings) {
- auto request = MakeOperationRequest<Ydb::RateLimiter::AcquireResourceRequest>(settings);
- request.set_coordination_node_path(coordinationNodePath);
- request.set_resource_path(resourcePath);
-
- if (settings.IsUsedAmount_) {
- request.set_used(settings.Amount_.GetRef());
- } else {
- request.set_required(settings.Amount_.GetRef());
- }
-
- return RunSimple<Ydb::RateLimiter::V1::RateLimiterService, Ydb::RateLimiter::AcquireResourceRequest, Ydb::RateLimiter::AcquireResourceResponse>(
- std::move(request),
- &Ydb::RateLimiter::V1::RateLimiterService::Stub::AsyncAcquireResource,
- TRpcRequestSettings::Make(settings),
- settings.ClientTimeout_);
+
+ TAsyncStatus AcquireResource(const TString& coordinationNodePath, const TString& resourcePath, const TAcquireResourceSettings& settings) {
+ auto request = MakeOperationRequest<Ydb::RateLimiter::AcquireResourceRequest>(settings);
+ request.set_coordination_node_path(coordinationNodePath);
+ request.set_resource_path(resourcePath);
+
+ if (settings.IsUsedAmount_) {
+ request.set_used(settings.Amount_.GetRef());
+ } else {
+ request.set_required(settings.Amount_.GetRef());
+ }
+
+ return RunSimple<Ydb::RateLimiter::V1::RateLimiterService, Ydb::RateLimiter::AcquireResourceRequest, Ydb::RateLimiter::AcquireResourceResponse>(
+ std::move(request),
+ &Ydb::RateLimiter::V1::RateLimiterService::Stub::AsyncAcquireResource,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
}
};
@@ -214,8 +214,8 @@ TAsyncDescribeResourceResult TRateLimiterClient::DescribeResource(const TString&
return Impl_->DescribeResource(coordinationNodePath, resourcePath, settings);
}
-TAsyncStatus TRateLimiterClient::AcquireResource(const TString& coordinationNodePath, const TString& resourcePath, const TAcquireResourceSettings& settings) {
- return Impl_->AcquireResource(coordinationNodePath, resourcePath, settings);
-}
-
+TAsyncStatus TRateLimiterClient::AcquireResource(const TString& coordinationNodePath, const TString& resourcePath, const TAcquireResourceSettings& settings) {
+ return Impl_->AcquireResource(coordinationNodePath, resourcePath, settings);
+}
+
} // namespace NYdb::NRateLimiter
diff --git a/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.h b/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.h
index dca90fbe639..f99b7bb80ed 100644
--- a/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.h
+++ b/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.h
@@ -79,14 +79,14 @@ private:
TVector<TString> ResourcePaths_;
};
-// Settings for acquire resource request.
-struct TAcquireResourceSettings : public TOperationRequestSettings<TAcquireResourceSettings> {
- using TSelf = TAcquireResourceSettings;
-
- FLUENT_SETTING_OPTIONAL(ui64, Amount);
- FLUENT_SETTING_FLAG(IsUsedAmount);
-};
-
+// Settings for acquire resource request.
+struct TAcquireResourceSettings : public TOperationRequestSettings<TAcquireResourceSettings> {
+ using TSelf = TAcquireResourceSettings;
+
+ FLUENT_SETTING_OPTIONAL(ui64, Amount);
+ FLUENT_SETTING_FLAG(IsUsedAmount);
+};
+
using TAsyncListResourcesResult = NThreading::TFuture<TListResourcesResult>;
// Result for describe resource request.
@@ -160,9 +160,9 @@ public:
// Describe properties of resource in coordination node.
TAsyncDescribeResourceResult DescribeResource(const TString& coordinationNodePath, const TString& resourcePath, const TDescribeResourceSettings& = {});
- // Acquire resources's units inside a coordination node.
+ // Acquire resources's units inside a coordination node.
TAsyncStatus AcquireResource(const TString& coordinationNodePath, const TString& resourcePath, const TAcquireResourceSettings& = {});
-
+
private:
class TImpl;
std::shared_ptr<TImpl> Impl_;
diff --git a/ydb/services/rate_limiter/grpc_service.cpp b/ydb/services/rate_limiter/grpc_service.cpp
index 480045d29ac..7a551fd25aa 100644
--- a/ydb/services/rate_limiter/grpc_service.cpp
+++ b/ydb/services/rate_limiter/grpc_service.cpp
@@ -63,7 +63,7 @@ void TRateLimiterGRpcService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
SETUP_METHOD(DropResource, TEvDropRateLimiterResource);
SETUP_METHOD(ListResources, TEvListRateLimiterResources);
SETUP_METHOD(DescribeResource, TEvDescribeRateLimiterResource);
- SETUP_METHOD(AcquireResource, TEvAcquireRateLimiterResource);
+ SETUP_METHOD(AcquireResource, TEvAcquireRateLimiterResource);
#undef SETUP_METHOD
}
diff --git a/ydb/services/rate_limiter/rate_limiter_ut.cpp b/ydb/services/rate_limiter/rate_limiter_ut.cpp
index 4ac05a571c3..9198ef29f6d 100644
--- a/ydb/services/rate_limiter/rate_limiter_ut.cpp
+++ b/ydb/services/rate_limiter/rate_limiter_ut.cpp
@@ -74,10 +74,10 @@ public:
}
void virtual CheckAcquireResource(const TString& coordinationNodePath, const TString& resourcePath, const NYdb::NRateLimiter::TAcquireResourceSettings& settings, NYdb::EStatus expected) {
- const auto acquireResultFuture = RateLimiterClient.AcquireResource(coordinationNodePath, resourcePath, settings);
- ASSERT_STATUS(acquireResultFuture, expected);
+ const auto acquireResultFuture = RateLimiterClient.AcquireResource(coordinationNodePath, resourcePath, settings);
+ ASSERT_STATUS(acquireResultFuture, expected);
}
-
+
static TString CoordinationNodePath;
NYdb::TKikimrWithGrpcAndRootSchema Server;
@@ -309,7 +309,7 @@ Y_UNIT_TEST_SUITE(TGRpcRateLimiterTest) {
UNIT_ASSERT_VALUES_EQUAL(paths[2], "parent1/child2");
}
}
-
+
std::unique_ptr<TTestSetup> MakeTestSetup(bool useActorApi) {
if (useActorApi) {
return std::make_unique<TTestSetupAcquireActor>();
@@ -318,33 +318,33 @@ Y_UNIT_TEST_SUITE(TGRpcRateLimiterTest) {
}
void AcquireResourceManyRequired(bool useActorApi) {
- using NYdb::NRateLimiter::TAcquireResourceSettings;
-
+ using NYdb::NRateLimiter::TAcquireResourceSettings;
+
auto setup = MakeTestSetup(useActorApi);
ASSERT_STATUS_SUCCESS(setup->RateLimiterClient.CreateResource(TTestSetup::CoordinationNodePath, "res",
- TCreateResourceSettings().MaxUnitsPerSecond(1).MaxBurstSizeCoefficient(42)));
-
+ TCreateResourceSettings().MaxUnitsPerSecond(1).MaxBurstSizeCoefficient(42)));
+
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS);
-
- for (int i = 0; i < 3; ++i) {
+
+ for (int i = 0; i < 3; ++i) {
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::TIMEOUT);
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS);
- }
- }
-
+ }
+ }
+
void AcquireResourceManyUsed(bool useActorApi) {
- using NYdb::NRateLimiter::TAcquireResourceSettings;
-
+ using NYdb::NRateLimiter::TAcquireResourceSettings;
+
auto setup = MakeTestSetup(useActorApi);
ASSERT_STATUS_SUCCESS(setup->RateLimiterClient.CreateResource(TTestSetup::CoordinationNodePath, "res",
- TCreateResourceSettings().MaxUnitsPerSecond(1).MaxBurstSizeCoefficient(42)));
-
+ TCreateResourceSettings().MaxUnitsPerSecond(1).MaxBurstSizeCoefficient(42)));
+
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).IsUsedAmount(true).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS);
- for (int i = 0; i < 3; ++i) {
+ for (int i = 0; i < 3; ++i) {
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::TIMEOUT);
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS);
- }
+ }
}
Y_UNIT_TEST(AcquireResourceManyRequiredGrpcApi) {