diff options
author | cube <cube@yandex-team.ru> | 2022-02-10 16:52:24 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:52:24 +0300 |
commit | 94684d13d71865e531792fa1c4c8f243a89fe0e7 (patch) | |
tree | d3948cb3aed1dc873399f0c173086226ed8bcc65 | |
parent | 70f16fb92d56f318d89b7f836104bc9ef6eb0067 (diff) | |
download | ydb-94684d13d71865e531792fa1c4c8f243a89fe0e7.tar.gz |
Restoring authorship annotation for <cube@yandex-team.ru>. Commit 1 of 2.
-rw-r--r-- | ydb/core/base/quoter.h | 10 | ||||
-rw-r--r-- | ydb/core/grpc_services/base/base.h | 2 | ||||
-rw-r--r-- | ydb/core/grpc_services/grpc_request_proxy.h | 2 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_rate_limiter_api.cpp | 896 | ||||
-rw-r--r-- | ydb/core/grpc_services/ya.make | 2 | ||||
-rw-r--r-- | ydb/core/quoter/probes.h | 6 | ||||
-rw-r--r-- | ydb/core/quoter/quoter_service.cpp | 70 | ||||
-rw-r--r-- | ydb/core/quoter/quoter_service_impl.h | 6 | ||||
-rw-r--r-- | ydb/core/quoter/quoter_service_ut.cpp | 68 | ||||
-rw-r--r-- | ydb/public/api/grpc/ydb_rate_limiter_v1.proto | 6 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_rate_limiter.proto | 60 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.cpp | 42 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.h | 20 | ||||
-rw-r--r-- | ydb/services/rate_limiter/grpc_service.cpp | 2 | ||||
-rw-r--r-- | ydb/services/rate_limiter/rate_limiter_ut.cpp | 38 |
15 files changed, 615 insertions, 615 deletions
diff --git a/ydb/core/base/quoter.h b/ydb/core/base/quoter.h index 41a19775bc8..d477e52b2aa 100644 --- a/ydb/core/base/quoter.h +++ b/ydb/core/base/quoter.h @@ -40,24 +40,24 @@ struct TEvQuota { const TString Resource; const ui64 Amount; - const bool IsUsedAmount; + const bool IsUsedAmount; TResourceLeaf(const TResourceLeaf&) = default; - TResourceLeaf(ui64 quoterId, ui64 resourceId, ui64 amount, bool isUsedAmount = false) + TResourceLeaf(ui64 quoterId, ui64 resourceId, ui64 amount, bool isUsedAmount = false) : QuoterId(quoterId) , ResourceId(resourceId) , Amount(amount) - , IsUsedAmount(isUsedAmount) + , IsUsedAmount(isUsedAmount) {} - TResourceLeaf(const TString "er, const TString &resource, ui64 amount, bool isUsedAmount = false) + TResourceLeaf(const TString "er, const TString &resource, ui64 amount, bool isUsedAmount = false) : QuoterId(0) , ResourceId(0) , Quoter(quoter) , Resource(resource) , Amount(amount) - , IsUsedAmount(isUsedAmount) + , IsUsedAmount(isUsedAmount) {} }; diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 44b25c4a5f9..8fffd7589f8 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -113,7 +113,7 @@ struct TRpcServices { EvDropRateLimiterResource, EvListRateLimiterResources, EvDescribeRateLimiterResource, - EvAcquireRateLimiterResource, + EvAcquireRateLimiterResource, EvKikhouseCreateSnapshot, EvKikhouseRefreshSnapshot, EvKikhouseDiscardSnapshot, diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h index 97315f6e9fb..268f708907e 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.h +++ b/ydb/core/grpc_services/grpc_request_proxy.h @@ -108,7 +108,7 @@ protected: void Handle(TEvDropRateLimiterResource::TPtr& ev, const TActorContext& ctx); void Handle(TEvListRateLimiterResources::TPtr& ev, const TActorContext& ctx); void Handle(TEvDescribeRateLimiterResource::TPtr& ev, const TActorContext& ctx); - void Handle(TEvAcquireRateLimiterResource::TPtr& ev, const TActorContext& ctx); + void Handle(TEvAcquireRateLimiterResource::TPtr& ev, const TActorContext& ctx); void Handle(TEvKikhouseCreateSnapshotRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvKikhouseRefreshSnapshotRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvKikhouseDiscardSnapshotRequest::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/grpc_services/rpc_rate_limiter_api.cpp b/ydb/core/grpc_services/rpc_rate_limiter_api.cpp index 5579be8bdb3..d926a414e32 100644 --- a/ydb/core/grpc_services/rpc_rate_limiter_api.cpp +++ b/ydb/core/grpc_services/rpc_rate_limiter_api.cpp @@ -1,52 +1,52 @@ -#include "grpc_request_proxy.h" - -#include "rpc_calls.h" -#include "rpc_scheme_base.h" -#include "rpc_common.h" - +#include "grpc_request_proxy.h" + +#include "rpc_calls.h" +#include "rpc_scheme_base.h" +#include "rpc_common.h" + #include <ydb/core/base/quoter.h> #include <ydb/core/kesus/tablet/events.h> - -namespace NKikimr::NGRpcService { - -using namespace NActors; -using namespace Ydb; -using namespace NKesus; - -namespace { - -template <typename TDerived, typename TRequest> -class TRateLimiterRequest : public TRpcOperationRequestActor<TDerived, TRequest> { -public: - using TBase = TRpcOperationRequestActor<TDerived, TRequest>; - + +namespace NKikimr::NGRpcService { + +using namespace NActors; +using namespace Ydb; +using namespace NKesus; + +namespace { + +template <typename TDerived, typename TRequest> +class TRateLimiterRequest : public TRpcOperationRequestActor<TDerived, TRequest> { +public: + using TBase = TRpcOperationRequestActor<TDerived, TRequest>; + TRateLimiterRequest(IRequestOpCtx* msg, bool trusted = false) : TBase(msg) , TrustedZone(trusted) {} - bool ValidateResource(const Ydb::RateLimiter::Resource& resource, Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) { - if (!ValidateResourcePath(resource.resource_path(), status, issues)) { - return false; - } - - if (resource.type_case() == Ydb::RateLimiter::Resource::TYPE_NOT_SET) { - status = StatusIds::BAD_REQUEST; - issues.AddIssue("No resource properties."); - return false; - } - - return true; - } - - bool ValidateResourcePath(const TString& path, Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) { - if (path != CanonizeQuoterResourcePath(path)) { - status = StatusIds::BAD_REQUEST; - issues.AddIssue("Bad resource path."); - return false; - } - return true; - } + bool ValidateResource(const Ydb::RateLimiter::Resource& resource, Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) { + if (!ValidateResourcePath(resource.resource_path(), status, issues)) { + return false; + } + + if (resource.type_case() == Ydb::RateLimiter::Resource::TYPE_NOT_SET) { + status = StatusIds::BAD_REQUEST; + issues.AddIssue("No resource properties."); + return false; + } + + return true; + } + + bool ValidateResourcePath(const TString& path, Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) { + if (path != CanonizeQuoterResourcePath(path)) { + status = StatusIds::BAD_REQUEST; + issues.AddIssue("Bad resource path."); + return false; + } + return true; + } bool ValidateCoordinationNodePath(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) { auto databaseName = this->Request_->GetDatabaseName() @@ -68,456 +68,456 @@ protected: private: const bool TrustedZone; }; - -template <class TEvRequest> -class TRateLimiterControlRequest : public TRateLimiterRequest<TRateLimiterControlRequest<TEvRequest>, TEvRequest> { -public: - using TBase = TRateLimiterRequest<TRateLimiterControlRequest<TEvRequest>, TEvRequest>; - using TBase::TBase; - - void Bootstrap(const TActorContext& ctx) { - TBase::Bootstrap(ctx); - - this->Become(&TRateLimiterControlRequest::StateFunc); - - Ydb::StatusIds::StatusCode status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; - NYql::TIssues issues; + +template <class TEvRequest> +class TRateLimiterControlRequest : public TRateLimiterRequest<TRateLimiterControlRequest<TEvRequest>, TEvRequest> { +public: + using TBase = TRateLimiterRequest<TRateLimiterControlRequest<TEvRequest>, TEvRequest>; + using TBase::TBase; + + void Bootstrap(const TActorContext& ctx) { + TBase::Bootstrap(ctx); + + this->Become(&TRateLimiterControlRequest::StateFunc); + + Ydb::StatusIds::StatusCode status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; + NYql::TIssues issues; if (!this->ValidateCoordinationNodePath(status, issues)) { this->Reply(status, issues, TActivationContext::ActorContextFor(this->SelfId())); return; } - if (!ValidateRequest(status, issues)) { - this->Reply(status, issues, TActivationContext::ActorContextFor(this->SelfId())); - return; - } - - ResolveCoordinationPath(); - } - -protected: - STFUNC(StateFunc) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); - hFunc(TEvTabletPipe::TEvClientConnected, Handle); - hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); - default: - return TBase::StateFuncBase(ev, ctx); - } - } - - void ResolveCoordinationPath() { + if (!ValidateRequest(status, issues)) { + this->Reply(status, issues, TActivationContext::ActorContextFor(this->SelfId())); + return; + } + + ResolveCoordinationPath(); + } + +protected: + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + hFunc(TEvTabletPipe::TEvClientConnected, Handle); + hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + default: + return TBase::StateFuncBase(ev, ctx); + } + } + + void ResolveCoordinationPath() { TVector<TString> path = NKikimr::SplitPath(this->GetCoordinationNodePath()); - if (path.empty()) { - this->Reply(StatusIds::BAD_REQUEST, "Empty path.", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TActivationContext::ActorContextFor(this->SelfId())); - return; - } - - auto req = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); - req->ResultSet.emplace_back(); - req->ResultSet.back().Path.swap(path); - req->ResultSet.back().Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; - this->Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(req), 0, 0); - } - - void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { - THolder<NSchemeCache::TSchemeCacheNavigate> navigate = std::move(ev->Get()->Request); - if (navigate->ResultSet.size() != 1 || navigate->ErrorCount > 0) { - this->Reply(StatusIds::INTERNAL_ERROR, TActivationContext::ActorContextFor(this->SelfId())); - return; - } - - const auto& entry = navigate->ResultSet.front(); - if (entry.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) { - this->Reply(StatusIds::SCHEME_ERROR, TActivationContext::ActorContextFor(this->SelfId())); - return; - } - - if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindKesus) { - this->Reply(StatusIds::BAD_REQUEST, "Path is not a coordination node path.", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TActivationContext::ActorContextFor(this->SelfId())); - return; - } - - if (!entry.KesusInfo) { - this->Reply(StatusIds::INTERNAL_ERROR, "Internal error: no coordination node info found.", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TActivationContext::ActorContextFor(this->SelfId())); - return; - } - - KesusTabletId = entry.KesusInfo->Description.GetKesusTabletId(); - - if (!KesusTabletId) { - this->Reply(StatusIds::INTERNAL_ERROR, "Internal error: no coordination node id found.", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TActivationContext::ActorContextFor(this->SelfId())); - return; - } - - CreatePipe(); - - SendRequest(); - } - - NTabletPipe::TClientConfig GetPipeConfig() { - NTabletPipe::TClientConfig cfg; + if (path.empty()) { + this->Reply(StatusIds::BAD_REQUEST, "Empty path.", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TActivationContext::ActorContextFor(this->SelfId())); + return; + } + + auto req = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); + req->ResultSet.emplace_back(); + req->ResultSet.back().Path.swap(path); + req->ResultSet.back().Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; + this->Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(req), 0, 0); + } + + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + THolder<NSchemeCache::TSchemeCacheNavigate> navigate = std::move(ev->Get()->Request); + if (navigate->ResultSet.size() != 1 || navigate->ErrorCount > 0) { + this->Reply(StatusIds::INTERNAL_ERROR, TActivationContext::ActorContextFor(this->SelfId())); + return; + } + + const auto& entry = navigate->ResultSet.front(); + if (entry.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) { + this->Reply(StatusIds::SCHEME_ERROR, TActivationContext::ActorContextFor(this->SelfId())); + return; + } + + if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindKesus) { + this->Reply(StatusIds::BAD_REQUEST, "Path is not a coordination node path.", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TActivationContext::ActorContextFor(this->SelfId())); + return; + } + + if (!entry.KesusInfo) { + this->Reply(StatusIds::INTERNAL_ERROR, "Internal error: no coordination node info found.", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TActivationContext::ActorContextFor(this->SelfId())); + return; + } + + KesusTabletId = entry.KesusInfo->Description.GetKesusTabletId(); + + if (!KesusTabletId) { + this->Reply(StatusIds::INTERNAL_ERROR, "Internal error: no coordination node id found.", NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TActivationContext::ActorContextFor(this->SelfId())); + return; + } + + CreatePipe(); + + SendRequest(); + } + + NTabletPipe::TClientConfig GetPipeConfig() { + NTabletPipe::TClientConfig cfg; cfg.RetryPolicy = { .RetryLimitCount = 3u }; - return cfg; - } - - void CreatePipe() { - KesusPipeClient = this->Register(NTabletPipe::CreateClient(this->SelfId(), KesusTabletId, GetPipeConfig())); - } - - void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) { - if (ev->Get()->Status != NKikimrProto::OK) { - this->Reply(StatusIds::UNAVAILABLE, "Failed to connect to coordination node.", NKikimrIssues::TIssuesIds::SHARD_NOT_AVAILABLE, TActivationContext::ActorContextFor(this->SelfId())); - } - } - - void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) { - this->Reply(StatusIds::UNAVAILABLE, "Connection to coordination node was lost.", NKikimrIssues::TIssuesIds::SHARD_NOT_AVAILABLE, TActivationContext::ActorContextFor(this->SelfId())); - } - - void ReplyFromKesusError(const NKikimrKesus::TKesusError& err) { - this->Reply(err.GetStatus(), err.GetIssues(), TActivationContext::ActorContextFor(this->SelfId())); - } - - virtual bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) = 0; - - virtual void SendRequest() = 0; - - void PassAway() override { - if (KesusPipeClient) { - NTabletPipe::CloseClient(this->SelfId(), KesusPipeClient); - KesusPipeClient = {}; - } - TBase::PassAway(); - } - -protected: - ui64 KesusTabletId = 0; - TActorId KesusPipeClient; -}; - -static void CopyProps(const Ydb::RateLimiter::Resource& src, NKikimrKesus::TStreamingQuoterResource& dst) { - dst.SetResourcePath(src.resource_path()); - const auto& srcProps = src.hierarchical_drr(); - auto& props = *dst.MutableHierarhicalDRRResourceConfig(); - props.SetMaxUnitsPerSecond(srcProps.max_units_per_second()); - props.SetMaxBurstSizeCoefficient(srcProps.max_burst_size_coefficient()); + return cfg; + } + + void CreatePipe() { + KesusPipeClient = this->Register(NTabletPipe::CreateClient(this->SelfId(), KesusTabletId, GetPipeConfig())); + } + + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) { + if (ev->Get()->Status != NKikimrProto::OK) { + this->Reply(StatusIds::UNAVAILABLE, "Failed to connect to coordination node.", NKikimrIssues::TIssuesIds::SHARD_NOT_AVAILABLE, TActivationContext::ActorContextFor(this->SelfId())); + } + } + + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) { + this->Reply(StatusIds::UNAVAILABLE, "Connection to coordination node was lost.", NKikimrIssues::TIssuesIds::SHARD_NOT_AVAILABLE, TActivationContext::ActorContextFor(this->SelfId())); + } + + void ReplyFromKesusError(const NKikimrKesus::TKesusError& err) { + this->Reply(err.GetStatus(), err.GetIssues(), TActivationContext::ActorContextFor(this->SelfId())); + } + + virtual bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) = 0; + + virtual void SendRequest() = 0; + + void PassAway() override { + if (KesusPipeClient) { + NTabletPipe::CloseClient(this->SelfId(), KesusPipeClient); + KesusPipeClient = {}; + } + TBase::PassAway(); + } + +protected: + ui64 KesusTabletId = 0; + TActorId KesusPipeClient; +}; + +static void CopyProps(const Ydb::RateLimiter::Resource& src, NKikimrKesus::TStreamingQuoterResource& dst) { + dst.SetResourcePath(src.resource_path()); + const auto& srcProps = src.hierarchical_drr(); + auto& props = *dst.MutableHierarhicalDRRResourceConfig(); + props.SetMaxUnitsPerSecond(srcProps.max_units_per_second()); + props.SetMaxBurstSizeCoefficient(srcProps.max_burst_size_coefficient()); props.SetPrefetchCoefficient(srcProps.prefetch_coefficient()); props.SetPrefetchWatermark(srcProps.prefetch_watermark()); -} - -static void CopyProps(const NKikimrKesus::TStreamingQuoterResource& src, Ydb::RateLimiter::Resource& dst) { - dst.set_resource_path(src.GetResourcePath()); - const auto& srcProps = src.GetHierarhicalDRRResourceConfig(); - auto& props = *dst.mutable_hierarchical_drr(); - props.set_max_units_per_second(srcProps.GetMaxUnitsPerSecond()); - props.set_max_burst_size_coefficient(srcProps.GetMaxBurstSizeCoefficient()); +} + +static void CopyProps(const NKikimrKesus::TStreamingQuoterResource& src, Ydb::RateLimiter::Resource& dst) { + dst.set_resource_path(src.GetResourcePath()); + const auto& srcProps = src.GetHierarhicalDRRResourceConfig(); + auto& props = *dst.mutable_hierarchical_drr(); + props.set_max_units_per_second(srcProps.GetMaxUnitsPerSecond()); + props.set_max_burst_size_coefficient(srcProps.GetMaxBurstSizeCoefficient()); props.set_prefetch_coefficient(srcProps.GetPrefetchCoefficient()); props.set_prefetch_watermark(srcProps.GetPrefetchWatermark()); -} - -class TCreateRateLimiterResourceRPC : public TRateLimiterControlRequest<TEvCreateRateLimiterResource> { -public: - using TBase = TRateLimiterControlRequest<TEvCreateRateLimiterResource>; - using TBase::TBase; - using TBase::Handle; - - - STFUNC(StateFunc) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvKesus::TEvAddQuoterResourceResult, Handle); - default: - return TBase::StateFunc(ev, ctx); - } - } - - bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override { +} + +class TCreateRateLimiterResourceRPC : public TRateLimiterControlRequest<TEvCreateRateLimiterResource> { +public: + using TBase = TRateLimiterControlRequest<TEvCreateRateLimiterResource>; + using TBase::TBase; + using TBase::Handle; + + + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvKesus::TEvAddQuoterResourceResult, Handle); + default: + return TBase::StateFunc(ev, ctx); + } + } + + bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override { return ValidateResource(GetProtoRequest()->resource(), status, issues); - } - - void SendRequest() override { - Become(&TCreateRateLimiterResourceRPC::StateFunc); - - THolder<TEvKesus::TEvAddQuoterResource> req = MakeHolder<TEvKesus::TEvAddQuoterResource>(); + } + + void SendRequest() override { + Become(&TCreateRateLimiterResourceRPC::StateFunc); + + THolder<TEvKesus::TEvAddQuoterResource> req = MakeHolder<TEvKesus::TEvAddQuoterResource>(); CopyProps(GetProtoRequest()->resource(), *req->Record.MutableResource()); - NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0); - } - - void Handle(TEvKesus::TEvAddQuoterResourceResult::TPtr& ev) { - ReplyFromKesusError(ev->Get()->Record.GetError()); - } -}; - -class TAlterRateLimiterResourceRPC : public TRateLimiterControlRequest<TEvAlterRateLimiterResource> { -public: - using TBase = TRateLimiterControlRequest<TEvAlterRateLimiterResource>; - using TBase::TBase; - using TBase::Handle; - - STFUNC(StateFunc) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvKesus::TEvUpdateQuoterResourceResult, Handle); - default: - return TBase::StateFunc(ev, ctx); - } - } - - bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override { + NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0); + } + + void Handle(TEvKesus::TEvAddQuoterResourceResult::TPtr& ev) { + ReplyFromKesusError(ev->Get()->Record.GetError()); + } +}; + +class TAlterRateLimiterResourceRPC : public TRateLimiterControlRequest<TEvAlterRateLimiterResource> { +public: + using TBase = TRateLimiterControlRequest<TEvAlterRateLimiterResource>; + using TBase::TBase; + using TBase::Handle; + + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvKesus::TEvUpdateQuoterResourceResult, Handle); + default: + return TBase::StateFunc(ev, ctx); + } + } + + bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override { return ValidateResource(GetProtoRequest()->resource(), status, issues); - } - - void SendRequest() override { - Become(&TAlterRateLimiterResourceRPC::StateFunc); - - THolder<TEvKesus::TEvUpdateQuoterResource> req = MakeHolder<TEvKesus::TEvUpdateQuoterResource>(); + } + + void SendRequest() override { + Become(&TAlterRateLimiterResourceRPC::StateFunc); + + THolder<TEvKesus::TEvUpdateQuoterResource> req = MakeHolder<TEvKesus::TEvUpdateQuoterResource>(); CopyProps(GetProtoRequest()->resource(), *req->Record.MutableResource()); - NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0); - } - - void Handle(TEvKesus::TEvUpdateQuoterResourceResult::TPtr& ev) { - ReplyFromKesusError(ev->Get()->Record.GetError()); - } -}; - -class TDropRateLimiterResourceRPC : public TRateLimiterControlRequest<TEvDropRateLimiterResource> { -public: - using TBase = TRateLimiterControlRequest<TEvDropRateLimiterResource>; - using TBase::TBase; - using TBase::Handle; - - STFUNC(StateFunc) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvKesus::TEvDeleteQuoterResourceResult, Handle); - default: - return TBase::StateFunc(ev, ctx); - } - } - - bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override { + NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0); + } + + void Handle(TEvKesus::TEvUpdateQuoterResourceResult::TPtr& ev) { + ReplyFromKesusError(ev->Get()->Record.GetError()); + } +}; + +class TDropRateLimiterResourceRPC : public TRateLimiterControlRequest<TEvDropRateLimiterResource> { +public: + using TBase = TRateLimiterControlRequest<TEvDropRateLimiterResource>; + using TBase::TBase; + using TBase::Handle; + + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvKesus::TEvDeleteQuoterResourceResult, Handle); + default: + return TBase::StateFunc(ev, ctx); + } + } + + bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override { return ValidateResourcePath(GetProtoRequest()->resource_path(), status, issues); - } - - void SendRequest() override { - Become(&TDropRateLimiterResourceRPC::StateFunc); - - THolder<TEvKesus::TEvDeleteQuoterResource> req = MakeHolder<TEvKesus::TEvDeleteQuoterResource>(); + } + + void SendRequest() override { + Become(&TDropRateLimiterResourceRPC::StateFunc); + + THolder<TEvKesus::TEvDeleteQuoterResource> req = MakeHolder<TEvKesus::TEvDeleteQuoterResource>(); req->Record.SetResourcePath(GetProtoRequest()->resource_path()); - NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0); - } - - void Handle(TEvKesus::TEvDeleteQuoterResourceResult::TPtr& ev) { - ReplyFromKesusError(ev->Get()->Record.GetError()); - } -}; - -class TListRateLimiterResourcesRPC : public TRateLimiterControlRequest<TEvListRateLimiterResources> { -public: - using TBase = TRateLimiterControlRequest<TEvListRateLimiterResources>; - using TBase::TBase; - using TBase::Handle; - - STFUNC(StateFunc) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvKesus::TEvDescribeQuoterResourcesResult, Handle); - default: - return TBase::StateFunc(ev, ctx); - } - } - - bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override { + NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0); + } + + void Handle(TEvKesus::TEvDeleteQuoterResourceResult::TPtr& ev) { + ReplyFromKesusError(ev->Get()->Record.GetError()); + } +}; + +class TListRateLimiterResourcesRPC : public TRateLimiterControlRequest<TEvListRateLimiterResources> { +public: + using TBase = TRateLimiterControlRequest<TEvListRateLimiterResources>; + using TBase::TBase; + using TBase::Handle; + + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvKesus::TEvDescribeQuoterResourcesResult, Handle); + default: + return TBase::StateFunc(ev, ctx); + } + } + + bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override { if (const TString& path = GetProtoRequest()->resource_path()) { - return ValidateResourcePath(path, status, issues); - } - return true; - } - - void SendRequest() override { - Become(&TListRateLimiterResourcesRPC::StateFunc); - - THolder<TEvKesus::TEvDescribeQuoterResources> req = MakeHolder<TEvKesus::TEvDescribeQuoterResources>(); + return ValidateResourcePath(path, status, issues); + } + return true; + } + + void SendRequest() override { + Become(&TListRateLimiterResourcesRPC::StateFunc); + + THolder<TEvKesus::TEvDescribeQuoterResources> req = MakeHolder<TEvKesus::TEvDescribeQuoterResources>(); if (const TString& path = GetProtoRequest()->resource_path()) { - req->Record.AddResourcePaths(path); - } + req->Record.AddResourcePaths(path); + } req->Record.SetRecursive(GetProtoRequest()->recursive()); - NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0); - } - - void Handle(TEvKesus::TEvDescribeQuoterResourcesResult::TPtr& ev) { - const NKikimrKesus::TKesusError& kesusError = ev->Get()->Record.GetError(); - if (kesusError.GetStatus() == Ydb::StatusIds::SUCCESS) { - Ydb::RateLimiter::ListResourcesResult result; - for (const auto& resource : ev->Get()->Record.GetResources()) { - result.add_resource_paths(resource.GetResourcePath()); - } - Request_->SendResult(result, Ydb::StatusIds::SUCCESS); - PassAway(); - } else { - ReplyFromKesusError(kesusError); - } - } -}; - -class TDescribeRateLimiterResourceRPC : public TRateLimiterControlRequest<TEvDescribeRateLimiterResource> { -public: - using TBase = TRateLimiterControlRequest<TEvDescribeRateLimiterResource>; - using TBase::TBase; - using TBase::Handle; - - STFUNC(StateFunc) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvKesus::TEvDescribeQuoterResourcesResult, Handle); - default: - return TBase::StateFunc(ev, ctx); - } - } - - bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override { + NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0); + } + + void Handle(TEvKesus::TEvDescribeQuoterResourcesResult::TPtr& ev) { + const NKikimrKesus::TKesusError& kesusError = ev->Get()->Record.GetError(); + if (kesusError.GetStatus() == Ydb::StatusIds::SUCCESS) { + Ydb::RateLimiter::ListResourcesResult result; + for (const auto& resource : ev->Get()->Record.GetResources()) { + result.add_resource_paths(resource.GetResourcePath()); + } + Request_->SendResult(result, Ydb::StatusIds::SUCCESS); + PassAway(); + } else { + ReplyFromKesusError(kesusError); + } + } +}; + +class TDescribeRateLimiterResourceRPC : public TRateLimiterControlRequest<TEvDescribeRateLimiterResource> { +public: + using TBase = TRateLimiterControlRequest<TEvDescribeRateLimiterResource>; + using TBase::TBase; + using TBase::Handle; + + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvKesus::TEvDescribeQuoterResourcesResult, Handle); + default: + return TBase::StateFunc(ev, ctx); + } + } + + bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) override { return ValidateResourcePath(GetProtoRequest()->resource_path(), status, issues); - } - - void SendRequest() override { - Become(&TDescribeRateLimiterResourceRPC::StateFunc); - - THolder<TEvKesus::TEvDescribeQuoterResources> req = MakeHolder<TEvKesus::TEvDescribeQuoterResources>(); + } + + void SendRequest() override { + Become(&TDescribeRateLimiterResourceRPC::StateFunc); + + THolder<TEvKesus::TEvDescribeQuoterResources> req = MakeHolder<TEvKesus::TEvDescribeQuoterResources>(); req->Record.AddResourcePaths(GetProtoRequest()->resource_path()); - NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0); - } - - void Handle(TEvKesus::TEvDescribeQuoterResourcesResult::TPtr& ev) { - const NKikimrKesus::TKesusError& kesusError = ev->Get()->Record.GetError(); - if (kesusError.GetStatus() == Ydb::StatusIds::SUCCESS) { - Ydb::RateLimiter::DescribeResourceResult result; - if (ev->Get()->Record.ResourcesSize() == 0) { - this->Reply(StatusIds::INTERNAL_ERROR, "No resource properties found.", NKikimrIssues::TIssuesIds::DEFAULT_ERROR, TActivationContext::ActorContextFor(this->SelfId())); - return; - } - CopyProps(ev->Get()->Record.GetResources(0), *result.mutable_resource()); - Request_->SendResult(result, Ydb::StatusIds::SUCCESS); - PassAway(); - } else { - ReplyFromKesusError(kesusError); - } - } -}; - -class TAcquireRateLimiterResourceRPC : public TRateLimiterRequest<TAcquireRateLimiterResourceRPC, TEvAcquireRateLimiterResource> { -public: - using TBase = TRateLimiterRequest<TAcquireRateLimiterResourceRPC, TEvAcquireRateLimiterResource>; - using TBase::TBase; - - void Bootstrap(const TActorContext& ctx) { - TBase::Bootstrap(ctx); - - Become(&TAcquireRateLimiterResourceRPC::StateFunc); - - Ydb::StatusIds::StatusCode status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; - NYql::TIssues issues; + NTabletPipe::SendData(SelfId(), KesusPipeClient, req.Release(), 0); + } + + void Handle(TEvKesus::TEvDescribeQuoterResourcesResult::TPtr& ev) { + const NKikimrKesus::TKesusError& kesusError = ev->Get()->Record.GetError(); + if (kesusError.GetStatus() == Ydb::StatusIds::SUCCESS) { + Ydb::RateLimiter::DescribeResourceResult result; + if (ev->Get()->Record.ResourcesSize() == 0) { + this->Reply(StatusIds::INTERNAL_ERROR, "No resource properties found.", NKikimrIssues::TIssuesIds::DEFAULT_ERROR, TActivationContext::ActorContextFor(this->SelfId())); + return; + } + CopyProps(ev->Get()->Record.GetResources(0), *result.mutable_resource()); + Request_->SendResult(result, Ydb::StatusIds::SUCCESS); + PassAway(); + } else { + ReplyFromKesusError(kesusError); + } + } +}; + +class TAcquireRateLimiterResourceRPC : public TRateLimiterRequest<TAcquireRateLimiterResourceRPC, TEvAcquireRateLimiterResource> { +public: + using TBase = TRateLimiterRequest<TAcquireRateLimiterResourceRPC, TEvAcquireRateLimiterResource>; + using TBase::TBase; + + void Bootstrap(const TActorContext& ctx) { + TBase::Bootstrap(ctx); + + Become(&TAcquireRateLimiterResourceRPC::StateFunc); + + Ydb::StatusIds::StatusCode status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; + NYql::TIssues issues; if (!ValidateCoordinationNodePath(status, issues)) { Reply(status, issues, TActivationContext::AsActorContext()); return; } - if (!ValidateRequest(status, issues)) { - Reply(status, issues, TActivationContext::AsActorContext()); - return; - } - - SendRequest(); - } - - STFUNC(StateFunc) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvQuota::TEvClearance, Handle); - default: - return TBase::StateFuncBase(ev, ctx); - } - } - - bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) { + if (!ValidateRequest(status, issues)) { + Reply(status, issues, TActivationContext::AsActorContext()); + return; + } + + SendRequest(); + } + + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvQuota::TEvClearance, Handle); + default: + return TBase::StateFuncBase(ev, ctx); + } + } + + bool ValidateRequest(Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) { if (!ValidateResourcePath(GetProtoRequest()->resource_path(), status, issues)) { - return false; - } - + return false; + } + if (GetProtoRequest()->units_case() == Ydb::RateLimiter::AcquireResourceRequest::UnitsCase::UNITS_NOT_SET) { - return false; - } - - return true; - } - - void SendRequest() { - Become(&TAcquireRateLimiterResourceRPC::StateFunc); - + return false; + } + + return true; + } + + void SendRequest() { + Become(&TAcquireRateLimiterResourceRPC::StateFunc); + if (GetProtoRequest()->units_case() == Ydb::RateLimiter::AcquireResourceRequest::UnitsCase::kRequired) { - SendLeaf( + SendLeaf( TEvQuota::TResourceLeaf(GetProtoRequest()->coordination_node_path(), GetProtoRequest()->resource_path(), GetProtoRequest()->required())); - return; + return; } - - SendLeaf( + + SendLeaf( TEvQuota::TResourceLeaf(GetProtoRequest()->coordination_node_path(), GetProtoRequest()->resource_path(), GetProtoRequest()->used(), - true)); - } - - void SendLeaf(const TEvQuota::TResourceLeaf& leaf) { - Send(MakeQuoterServiceID(), + true)); + } + + void SendLeaf(const TEvQuota::TResourceLeaf& leaf) { + Send(MakeQuoterServiceID(), new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, { leaf }, GetOperationTimeout()), 0, 0); - } - - void Handle(TEvQuota::TEvClearance::TPtr& ev) { - switch (ev->Get()->Result) { - case TEvQuota::TEvClearance::EResult::Success: - Reply(StatusIds::SUCCESS, TActivationContext::AsActorContext()); - break; - case TEvQuota::TEvClearance::EResult::UnknownResource: - Reply(StatusIds::BAD_REQUEST, TActivationContext::AsActorContext()); - break; - case TEvQuota::TEvClearance::EResult::Deadline: - Reply(StatusIds::TIMEOUT, TActivationContext::AsActorContext()); - break; - default: - Reply(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext()); - } - } -}; - -} // namespace - -void TGRpcRequestProxy::Handle(TEvCreateRateLimiterResource::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TCreateRateLimiterResourceRPC(ev->Release().Release())); -} - -void TGRpcRequestProxy::Handle(TEvAlterRateLimiterResource::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TAlterRateLimiterResourceRPC(ev->Release().Release())); -} - -void TGRpcRequestProxy::Handle(TEvDropRateLimiterResource::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TDropRateLimiterResourceRPC(ev->Release().Release())); -} - -void TGRpcRequestProxy::Handle(TEvListRateLimiterResources::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TListRateLimiterResourcesRPC(ev->Release().Release())); -} - -void TGRpcRequestProxy::Handle(TEvDescribeRateLimiterResource::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TDescribeRateLimiterResourceRPC(ev->Release().Release())); -} - -void TGRpcRequestProxy::Handle(TEvAcquireRateLimiterResource::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TAcquireRateLimiterResourceRPC(ev->Release().Release())); -} - + } + + void Handle(TEvQuota::TEvClearance::TPtr& ev) { + switch (ev->Get()->Result) { + case TEvQuota::TEvClearance::EResult::Success: + Reply(StatusIds::SUCCESS, TActivationContext::AsActorContext()); + break; + case TEvQuota::TEvClearance::EResult::UnknownResource: + Reply(StatusIds::BAD_REQUEST, TActivationContext::AsActorContext()); + break; + case TEvQuota::TEvClearance::EResult::Deadline: + Reply(StatusIds::TIMEOUT, TActivationContext::AsActorContext()); + break; + default: + Reply(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext()); + } + } +}; + +} // namespace + +void TGRpcRequestProxy::Handle(TEvCreateRateLimiterResource::TPtr& ev, const TActorContext& ctx) { + ctx.Register(new TCreateRateLimiterResourceRPC(ev->Release().Release())); +} + +void TGRpcRequestProxy::Handle(TEvAlterRateLimiterResource::TPtr& ev, const TActorContext& ctx) { + ctx.Register(new TAlterRateLimiterResourceRPC(ev->Release().Release())); +} + +void TGRpcRequestProxy::Handle(TEvDropRateLimiterResource::TPtr& ev, const TActorContext& ctx) { + ctx.Register(new TDropRateLimiterResourceRPC(ev->Release().Release())); +} + +void TGRpcRequestProxy::Handle(TEvListRateLimiterResources::TPtr& ev, const TActorContext& ctx) { + ctx.Register(new TListRateLimiterResourcesRPC(ev->Release().Release())); +} + +void TGRpcRequestProxy::Handle(TEvDescribeRateLimiterResource::TPtr& ev, const TActorContext& ctx) { + ctx.Register(new TDescribeRateLimiterResourceRPC(ev->Release().Release())); +} + +void TGRpcRequestProxy::Handle(TEvAcquireRateLimiterResource::TPtr& ev, const TActorContext& ctx) { + ctx.Register(new TAcquireRateLimiterResourceRPC(ev->Release().Release())); +} + template<> IActor* TEvAcquireRateLimiterResource::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) { return new TAcquireRateLimiterResourceRPC(msg, true); } -} // namespace NKikimr::NGRpcService +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index 05b156bf043..7ae3d5cb2d9 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -58,7 +58,7 @@ SRCS( rpc_modify_permissions.cpp rpc_monitoring.cpp rpc_prepare_data_query.cpp - rpc_rate_limiter_api.cpp + rpc_rate_limiter_api.cpp rpc_read_columns.cpp rpc_read_table.cpp rpc_remove_directory.cpp diff --git a/ydb/core/quoter/probes.h b/ydb/core/quoter/probes.h index c5ef9aca125..ee9c14f6eb9 100644 --- a/ydb/core/quoter/probes.h +++ b/ydb/core/quoter/probes.h @@ -24,9 +24,9 @@ PROBE(StartCharging, GROUPS("QuoterService", "ClientRequest"), \ TYPES(TString, TString, ui64, ui64), \ NAMES("quoter", "resource", "quoterId", "resourceId")) \ - PROBE(Charge, GROUPS("QuoterService", "ClientRequest"), \ - TYPES(TString, TString, ui64, ui64), \ - NAMES("quoter", "resource", "quoterId", "resourceId")) \ + PROBE(Charge, GROUPS("QuoterService", "ClientRequest"), \ + TYPES(TString, TString, ui64, ui64), \ + NAMES("quoter", "resource", "quoterId", "resourceId")) \ \ PROBE(AllocateResource, GROUPS("QuoterService", "Resource"), \ TYPES(TString, TString, ui64, ui64, ui64, double, ui64, double, double, double), \ diff --git a/ydb/core/quoter/quoter_service.cpp b/ydb/core/quoter/quoter_service.cpp index b9a1b629f51..dea19f4fd9e 100644 --- a/ydb/core/quoter/quoter_service.cpp +++ b/ydb/core/quoter/quoter_service.cpp @@ -147,7 +147,7 @@ TResourceLeaf& TResState::Get(ui32 idx) { return Leafs[idx]; } -ui32 TResState::Allocate(TResource *resource, ui64 amount, bool isUsedAmount, ui32 requestIdx) { +ui32 TResState::Allocate(TResource *resource, ui64 amount, bool isUsedAmount, ui32 requestIdx) { ui32 idx; if (Unused) { idx = Unused.back(); @@ -160,7 +160,7 @@ ui32 TResState::Allocate(TResource *resource, ui64 amount, bool isUsedAmount, ui auto &x = Leafs[idx]; x.Resource = resource; x.Amount = amount; - x.IsUsedAmount = isUsedAmount; + x.IsUsedAmount = isUsedAmount; x.RequestIdx = requestIdx; Y_VERIFY_DEBUG(x.NextInWaitQueue == Max<ui32>()); @@ -232,12 +232,12 @@ void TResource::StopStarvation(TInstant now) { TDuration TResource::Charge(TRequest& request, TResourceLeaf& leaf, TInstant now) { MarkStartedCharging(request, leaf, now); - if (leaf.IsUsedAmount) { - ChargeUsedAmount(leaf.Amount, now); - Counters.RequestTime->Collect((now - request.StartTime).MilliSeconds()); - return TDuration::Zero(); - } - + if (leaf.IsUsedAmount) { + ChargeUsedAmount(leaf.Amount, now); + Counters.RequestTime->Collect((now - request.StartTime).MilliSeconds()); + return TDuration::Zero(); + } + const TDuration result = Charge(leaf.Amount, now); if (result == TDuration::Zero()) { Counters.RequestTime->Collect((now - request.StartTime).MilliSeconds()); @@ -245,24 +245,24 @@ TDuration TResource::Charge(TRequest& request, TResourceLeaf& leaf, TInstant now return result; } -void TResource::ChargeUsedAmount(double amount, TInstant now) { - BLOG_T("ChargeUsedAmount \"" << Resource << "\" for " << amount - << ". Balance: " << Balance - << ". FreeBalance: " << FreeBalance - << ". Now: " << now); - LastAllocated = now; - FreeBalance -= amount; - Balance -= amount; - AmountConsumed += amount; +void TResource::ChargeUsedAmount(double amount, TInstant now) { + BLOG_T("ChargeUsedAmount \"" << Resource << "\" for " << amount + << ". Balance: " << Balance + << ". FreeBalance: " << FreeBalance + << ". Now: " << now); + LastAllocated = now; + FreeBalance -= amount; + Balance -= amount; + AmountConsumed += amount; History.Add(now, amount); - Counters.Consumed->Add(static_cast<i64>(amount)); - if (Balance >= 0.0) { - StopStarvation(now); - return; - } - StartStarvation(now); -} - + Counters.Consumed->Add(static_cast<i64>(amount)); + if (Balance >= 0.0) { + StopStarvation(now); + return; + } + StartStarvation(now); +} + TDuration TResource::Charge(double amount, TInstant now) { // Zero - charged // Max - not in current tick (or resource already queued) @@ -587,7 +587,7 @@ TQuoterService::EInitLeafStatus TQuoterService::InitResourceLeaf(const TEvQuota: quoter->WaitingQueueResolve.emplace(reqIdx); // todo: make generic 'leaf for resolve' helper - const ui32 resLeafIdx = ResState.Allocate(nullptr, leaf.Amount, leaf.IsUsedAmount, reqIdx); + const ui32 resLeafIdx = ResState.Allocate(nullptr, leaf.Amount, leaf.IsUsedAmount, reqIdx); TResourceLeaf& resLeaf = ResState.Get(resLeafIdx); resLeaf.QuoterId = quoterId; @@ -618,7 +618,7 @@ TQuoterService::EInitLeafStatus TQuoterService::InitResourceLeaf(const TEvQuota: auto rIndxIt = quoter->WaitingResource.emplace(leaf.Resource, TSet<ui32>()); rIndxIt.first->second.emplace(reqIdx); - const ui32 resLeafIdx = ResState.Allocate(nullptr, leaf.Amount, leaf.IsUsedAmount, reqIdx); + const ui32 resLeafIdx = ResState.Allocate(nullptr, leaf.Amount, leaf.IsUsedAmount, reqIdx); TResourceLeaf& resLeaf = ResState.Get(resLeafIdx); resLeaf.QuoterId = quoterId; @@ -693,13 +693,13 @@ TQuoterService::EInitLeafStatus TQuoterService::TryCharge(TResource& quores, ui6 const TInstant now = TActivationContext::Now(); bool startedCharge = false; LWTRACK(ResourceQueueState, request.Orbit, leaf.Quoter, leaf.Resource, leaf.QuoterId, leaf.ResourceId, quores.QueueSize, quores.QueueWeight); - if (leaf.IsUsedAmount) { - quores.ChargeUsedAmount(leaf.Amount, now); - LWTRACK(Charge, request.Orbit, leaf.Quoter, leaf.Resource, leaf.QuoterId, leaf.ResourceId); - quores.Counters.RequestTime->Collect((now - request.StartTime).MilliSeconds()); - return EInitLeafStatus::Charged; - } - + if (leaf.IsUsedAmount) { + quores.ChargeUsedAmount(leaf.Amount, now); + LWTRACK(Charge, request.Orbit, leaf.Quoter, leaf.Resource, leaf.QuoterId, leaf.ResourceId); + quores.Counters.RequestTime->Collect((now - request.StartTime).MilliSeconds()); + return EInitLeafStatus::Charged; + } + if (quores.QueueSize == 0) { startedCharge = true; const TDuration delay = quores.Charge(leaf.Amount, now); @@ -714,7 +714,7 @@ TQuoterService::EInitLeafStatus TQuoterService::TryCharge(TResource& quores, ui6 } // need wait entry for resource - const ui32 resLeafIdx = ResState.Allocate(&quores, leaf.Amount, leaf.IsUsedAmount, reqIdx); + const ui32 resLeafIdx = ResState.Allocate(&quores, leaf.Amount, leaf.IsUsedAmount, reqIdx); TResourceLeaf& resLeaf = ResState.Get(resLeafIdx); resLeaf.State = EResourceState::Wait; diff --git a/ydb/core/quoter/quoter_service_impl.h b/ydb/core/quoter/quoter_service_impl.h index d27ea134e19..114ea2c091b 100644 --- a/ydb/core/quoter/quoter_service_impl.h +++ b/ydb/core/quoter/quoter_service_impl.h @@ -86,7 +86,7 @@ enum class EResourceState { struct TResourceLeaf { TResource *Resource = nullptr; ui64 Amount = Max<ui64>(); - bool IsUsedAmount = false; + bool IsUsedAmount = false; ui32 RequestIdx = Max<ui32>(); @@ -113,7 +113,7 @@ private: TVector<ui32> Unused; public: TResourceLeaf& Get(ui32 idx); - ui32 Allocate(TResource *resource, ui64 amount, bool isUsedAmount, ui32 requestIdx); + ui32 Allocate(TResource *resource, ui64 amount, bool isUsedAmount, ui32 requestIdx); void FreeChain(ui32 headIdx); }; @@ -275,7 +275,7 @@ class TQuoterService : public TActorBootstrapped<TQuoterService> { EInitLeafStatus InitSystemLeaf(const TEvQuota::TResourceLeaf &leaf, TRequest &request, ui32 reqIdx); EInitLeafStatus InitResourceLeaf(const TEvQuota::TResourceLeaf &leaf, TRequest &request, ui32 reqIdx); EInitLeafStatus TryCharge(TResource& quores, ui64 quoterId, ui64 resourceId, const TEvQuota::TResourceLeaf &leaf, TRequest &request, ui32 reqIdx); - EInitLeafStatus NotifyUsed(TResource& quores, ui64 quoterId, ui64 resourceId, const TEvQuota::TResourceLeaf &leaf, TRequest &request); + EInitLeafStatus NotifyUsed(TResource& quores, ui64 quoterId, ui64 resourceId, const TEvQuota::TResourceLeaf &leaf, TRequest &request); void MarkScheduleAllocation(TResource& quores, TDuration delay, TInstant now); void InitialRequestProcessing(TEvQuota::TEvRequest::TPtr &ev, const ui32 reqIdx); diff --git a/ydb/core/quoter/quoter_service_ut.cpp b/ydb/core/quoter/quoter_service_ut.cpp index 88c74ca3fb1..f169bb6a074 100644 --- a/ydb/core/quoter/quoter_service_ut.cpp +++ b/ydb/core/quoter/quoter_service_ut.cpp @@ -71,42 +71,42 @@ Y_UNIT_TEST_SUITE(TQuoterServiceTest) { UNIT_ASSERT_VALUES_EQUAL(ev->Cookie, 300 + i); } } - - { - auto resId = TEvQuota::TResourceLeaf::MakeTaggedRateRes(1, 1); - - runtime->Send(new IEventHandle(MakeQuoterServiceID(), sender, - new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, { - TEvQuota::TResourceLeaf(TEvQuota::TResourceLeaf::QuoterSystem, resId, 1) - }, TDuration::Max()))); - - THolder<TEvQuota::TEvClearance> reply = runtime->GrabEdgeEvent<TEvQuota::TEvClearance>(); + + { + auto resId = TEvQuota::TResourceLeaf::MakeTaggedRateRes(1, 1); + + runtime->Send(new IEventHandle(MakeQuoterServiceID(), sender, + new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, { + TEvQuota::TResourceLeaf(TEvQuota::TResourceLeaf::QuoterSystem, resId, 1) + }, TDuration::Max()))); + + THolder<TEvQuota::TEvClearance> reply = runtime->GrabEdgeEvent<TEvQuota::TEvClearance>(); UNIT_ASSERT(reply->Result == TEvQuota::TEvClearance::EResult::Success); - - runtime->Send(new IEventHandle(MakeQuoterServiceID(), sender, - new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, { - TEvQuota::TResourceLeaf(TEvQuota::TResourceLeaf::QuoterSystem, resId, 1000, true) - }, TDuration::Seconds(1)))); - - reply = runtime->GrabEdgeEvent<TEvQuota::TEvClearance>(); + + runtime->Send(new IEventHandle(MakeQuoterServiceID(), sender, + new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, { + TEvQuota::TResourceLeaf(TEvQuota::TResourceLeaf::QuoterSystem, resId, 1000, true) + }, TDuration::Seconds(1)))); + + reply = runtime->GrabEdgeEvent<TEvQuota::TEvClearance>(); + UNIT_ASSERT(reply->Result == TEvQuota::TEvClearance::EResult::Success); + + runtime->Send(new IEventHandle(MakeQuoterServiceID(), sender, + new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, { + TEvQuota::TResourceLeaf(TEvQuota::TResourceLeaf::QuoterSystem, resId, 1000) + }, TDuration::Seconds(1)))); + + reply = runtime->GrabEdgeEvent<TEvQuota::TEvClearance>(); + UNIT_ASSERT(reply->Result == TEvQuota::TEvClearance::EResult::Deadline); + + runtime->Send(new IEventHandle(MakeQuoterServiceID(), sender, + new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, { + TEvQuota::TResourceLeaf(TEvQuota::TResourceLeaf::QuoterSystem, resId, 1000, true) + }, TDuration::Seconds(1)))); + + reply = runtime->GrabEdgeEvent<TEvQuota::TEvClearance>(); UNIT_ASSERT(reply->Result == TEvQuota::TEvClearance::EResult::Success); - - runtime->Send(new IEventHandle(MakeQuoterServiceID(), sender, - new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, { - TEvQuota::TResourceLeaf(TEvQuota::TResourceLeaf::QuoterSystem, resId, 1000) - }, TDuration::Seconds(1)))); - - reply = runtime->GrabEdgeEvent<TEvQuota::TEvClearance>(); - UNIT_ASSERT(reply->Result == TEvQuota::TEvClearance::EResult::Deadline); - - runtime->Send(new IEventHandle(MakeQuoterServiceID(), sender, - new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, { - TEvQuota::TResourceLeaf(TEvQuota::TResourceLeaf::QuoterSystem, resId, 1000, true) - }, TDuration::Seconds(1)))); - - reply = runtime->GrabEdgeEvent<TEvQuota::TEvClearance>(); - UNIT_ASSERT(reply->Result == TEvQuota::TEvClearance::EResult::Success); - } + } } #if defined(OPTIMIZED) diff --git a/ydb/public/api/grpc/ydb_rate_limiter_v1.proto b/ydb/public/api/grpc/ydb_rate_limiter_v1.proto index be477d1dd59..b8431847285 100644 --- a/ydb/public/api/grpc/ydb_rate_limiter_v1.proto +++ b/ydb/public/api/grpc/ydb_rate_limiter_v1.proto @@ -29,7 +29,7 @@ service RateLimiterService { // Describe properties of resource in coordination node. rpc DescribeResource(DescribeResourceRequest) returns (DescribeResourceResponse); - - // Take units for usage of a resource in coordination node. - rpc AcquireResource(AcquireResourceRequest) returns (AcquireResourceResponse); + + // Take units for usage of a resource in coordination node. + rpc AcquireResource(AcquireResourceRequest) returns (AcquireResourceResponse); } diff --git a/ydb/public/api/protos/ydb_rate_limiter.proto b/ydb/public/api/protos/ydb_rate_limiter.proto index c18430c32b6..d44e3b5f966 100644 --- a/ydb/public/api/protos/ydb_rate_limiter.proto +++ b/ydb/public/api/protos/ydb_rate_limiter.proto @@ -175,33 +175,33 @@ message DescribeResourceResponse { message DescribeResourceResult { Resource resource = 1; } - -// -// AcquireResource method. -// - -message AcquireResourceRequest { - Ydb.Operations.OperationParams operation_params = 1; - - // Path of a coordination node. - string coordination_node_path = 2; - - // Path of resource inside a coordination node. - string resource_path = 3; - - oneof units { - // Request resource's units for usage. - uint64 required = 4; - - // Actually used resource's units by client. - uint64 used = 5; - } -} - -message AcquireResourceResponse { - // Holds AcquireResourceResult in case of successful call. - Ydb.Operations.Operation operation = 1; -} - -message AcquireResourceResult { -} + +// +// AcquireResource method. +// + +message AcquireResourceRequest { + Ydb.Operations.OperationParams operation_params = 1; + + // Path of a coordination node. + string coordination_node_path = 2; + + // Path of resource inside a coordination node. + string resource_path = 3; + + oneof units { + // Request resource's units for usage. + uint64 required = 4; + + // Actually used resource's units by client. + uint64 used = 5; + } +} + +message AcquireResourceResponse { + // Holds AcquireResourceResult in case of successful call. + Ydb.Operations.Operation operation = 1; +} + +message AcquireResourceResult { +} diff --git a/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.cpp b/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.cpp index bea557812c6..23e8942221c 100644 --- a/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.cpp +++ b/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.cpp @@ -169,23 +169,23 @@ public: return promise.GetFuture(); } - - TAsyncStatus AcquireResource(const TString& coordinationNodePath, const TString& resourcePath, const TAcquireResourceSettings& settings) { - auto request = MakeOperationRequest<Ydb::RateLimiter::AcquireResourceRequest>(settings); - request.set_coordination_node_path(coordinationNodePath); - request.set_resource_path(resourcePath); - - if (settings.IsUsedAmount_) { - request.set_used(settings.Amount_.GetRef()); - } else { - request.set_required(settings.Amount_.GetRef()); - } - - return RunSimple<Ydb::RateLimiter::V1::RateLimiterService, Ydb::RateLimiter::AcquireResourceRequest, Ydb::RateLimiter::AcquireResourceResponse>( - std::move(request), - &Ydb::RateLimiter::V1::RateLimiterService::Stub::AsyncAcquireResource, - TRpcRequestSettings::Make(settings), - settings.ClientTimeout_); + + TAsyncStatus AcquireResource(const TString& coordinationNodePath, const TString& resourcePath, const TAcquireResourceSettings& settings) { + auto request = MakeOperationRequest<Ydb::RateLimiter::AcquireResourceRequest>(settings); + request.set_coordination_node_path(coordinationNodePath); + request.set_resource_path(resourcePath); + + if (settings.IsUsedAmount_) { + request.set_used(settings.Amount_.GetRef()); + } else { + request.set_required(settings.Amount_.GetRef()); + } + + return RunSimple<Ydb::RateLimiter::V1::RateLimiterService, Ydb::RateLimiter::AcquireResourceRequest, Ydb::RateLimiter::AcquireResourceResponse>( + std::move(request), + &Ydb::RateLimiter::V1::RateLimiterService::Stub::AsyncAcquireResource, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); } }; @@ -214,8 +214,8 @@ TAsyncDescribeResourceResult TRateLimiterClient::DescribeResource(const TString& return Impl_->DescribeResource(coordinationNodePath, resourcePath, settings); } -TAsyncStatus TRateLimiterClient::AcquireResource(const TString& coordinationNodePath, const TString& resourcePath, const TAcquireResourceSettings& settings) { - return Impl_->AcquireResource(coordinationNodePath, resourcePath, settings); -} - +TAsyncStatus TRateLimiterClient::AcquireResource(const TString& coordinationNodePath, const TString& resourcePath, const TAcquireResourceSettings& settings) { + return Impl_->AcquireResource(coordinationNodePath, resourcePath, settings); +} + } // namespace NYdb::NRateLimiter diff --git a/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.h b/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.h index dca90fbe639..f99b7bb80ed 100644 --- a/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.h +++ b/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.h @@ -79,14 +79,14 @@ private: TVector<TString> ResourcePaths_; }; -// Settings for acquire resource request. -struct TAcquireResourceSettings : public TOperationRequestSettings<TAcquireResourceSettings> { - using TSelf = TAcquireResourceSettings; - - FLUENT_SETTING_OPTIONAL(ui64, Amount); - FLUENT_SETTING_FLAG(IsUsedAmount); -}; - +// Settings for acquire resource request. +struct TAcquireResourceSettings : public TOperationRequestSettings<TAcquireResourceSettings> { + using TSelf = TAcquireResourceSettings; + + FLUENT_SETTING_OPTIONAL(ui64, Amount); + FLUENT_SETTING_FLAG(IsUsedAmount); +}; + using TAsyncListResourcesResult = NThreading::TFuture<TListResourcesResult>; // Result for describe resource request. @@ -160,9 +160,9 @@ public: // Describe properties of resource in coordination node. TAsyncDescribeResourceResult DescribeResource(const TString& coordinationNodePath, const TString& resourcePath, const TDescribeResourceSettings& = {}); - // Acquire resources's units inside a coordination node. + // Acquire resources's units inside a coordination node. TAsyncStatus AcquireResource(const TString& coordinationNodePath, const TString& resourcePath, const TAcquireResourceSettings& = {}); - + private: class TImpl; std::shared_ptr<TImpl> Impl_; diff --git a/ydb/services/rate_limiter/grpc_service.cpp b/ydb/services/rate_limiter/grpc_service.cpp index 480045d29ac..7a551fd25aa 100644 --- a/ydb/services/rate_limiter/grpc_service.cpp +++ b/ydb/services/rate_limiter/grpc_service.cpp @@ -63,7 +63,7 @@ void TRateLimiterGRpcService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { SETUP_METHOD(DropResource, TEvDropRateLimiterResource); SETUP_METHOD(ListResources, TEvListRateLimiterResources); SETUP_METHOD(DescribeResource, TEvDescribeRateLimiterResource); - SETUP_METHOD(AcquireResource, TEvAcquireRateLimiterResource); + SETUP_METHOD(AcquireResource, TEvAcquireRateLimiterResource); #undef SETUP_METHOD } diff --git a/ydb/services/rate_limiter/rate_limiter_ut.cpp b/ydb/services/rate_limiter/rate_limiter_ut.cpp index 4ac05a571c3..9198ef29f6d 100644 --- a/ydb/services/rate_limiter/rate_limiter_ut.cpp +++ b/ydb/services/rate_limiter/rate_limiter_ut.cpp @@ -74,10 +74,10 @@ public: } void virtual CheckAcquireResource(const TString& coordinationNodePath, const TString& resourcePath, const NYdb::NRateLimiter::TAcquireResourceSettings& settings, NYdb::EStatus expected) { - const auto acquireResultFuture = RateLimiterClient.AcquireResource(coordinationNodePath, resourcePath, settings); - ASSERT_STATUS(acquireResultFuture, expected); + const auto acquireResultFuture = RateLimiterClient.AcquireResource(coordinationNodePath, resourcePath, settings); + ASSERT_STATUS(acquireResultFuture, expected); } - + static TString CoordinationNodePath; NYdb::TKikimrWithGrpcAndRootSchema Server; @@ -309,7 +309,7 @@ Y_UNIT_TEST_SUITE(TGRpcRateLimiterTest) { UNIT_ASSERT_VALUES_EQUAL(paths[2], "parent1/child2"); } } - + std::unique_ptr<TTestSetup> MakeTestSetup(bool useActorApi) { if (useActorApi) { return std::make_unique<TTestSetupAcquireActor>(); @@ -318,33 +318,33 @@ Y_UNIT_TEST_SUITE(TGRpcRateLimiterTest) { } void AcquireResourceManyRequired(bool useActorApi) { - using NYdb::NRateLimiter::TAcquireResourceSettings; - + using NYdb::NRateLimiter::TAcquireResourceSettings; + auto setup = MakeTestSetup(useActorApi); ASSERT_STATUS_SUCCESS(setup->RateLimiterClient.CreateResource(TTestSetup::CoordinationNodePath, "res", - TCreateResourceSettings().MaxUnitsPerSecond(1).MaxBurstSizeCoefficient(42))); - + TCreateResourceSettings().MaxUnitsPerSecond(1).MaxBurstSizeCoefficient(42))); + setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS); - - for (int i = 0; i < 3; ++i) { + + for (int i = 0; i < 3; ++i) { setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::TIMEOUT); setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS); - } - } - + } + } + void AcquireResourceManyUsed(bool useActorApi) { - using NYdb::NRateLimiter::TAcquireResourceSettings; - + using NYdb::NRateLimiter::TAcquireResourceSettings; + auto setup = MakeTestSetup(useActorApi); ASSERT_STATUS_SUCCESS(setup->RateLimiterClient.CreateResource(TTestSetup::CoordinationNodePath, "res", - TCreateResourceSettings().MaxUnitsPerSecond(1).MaxBurstSizeCoefficient(42))); - + TCreateResourceSettings().MaxUnitsPerSecond(1).MaxBurstSizeCoefficient(42))); + setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).IsUsedAmount(true).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS); - for (int i = 0; i < 3; ++i) { + for (int i = 0; i < 3; ++i) { setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::TIMEOUT); setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS); - } + } } Y_UNIT_TEST(AcquireResourceManyRequiredGrpcApi) { |