diff options
author | ilnaz <ilnaz@ydb.tech> | 2022-10-06 18:59:21 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2022-10-06 18:59:21 +0300 |
commit | 68fd753127853234cd0812750f9bd10c81ab5fa9 (patch) | |
tree | 9d665821850ea8108f34c5d0560c6c21e5d88e53 | |
parent | 3a5282585871fdeaaa5ce17b68da46f9f9597233 (diff) | |
download | ydb-68fd753127853234cd0812750f9bd10c81ab5fa9.tar.gz |
Account RUs consumed by TopicService in common RUs counter
-rw-r--r-- | ydb/core/grpc_services/base/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/grpc_services/base/base.h | 28 | ||||
-rw-r--r-- | ydb/core/grpc_services/grpc_request_check_actor.h | 8 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/read_session_actor.ipp | 5 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/write_session_actor.ipp | 4 |
5 files changed, 41 insertions, 5 deletions
diff --git a/ydb/core/grpc_services/base/CMakeLists.txt b/ydb/core/grpc_services/base/CMakeLists.txt index 80c3f43f25..0aaf328e0d 100644 --- a/ydb/core/grpc_services/base/CMakeLists.txt +++ b/ydb/core/grpc_services/base/CMakeLists.txt @@ -14,6 +14,7 @@ target_link_libraries(core-grpc_services-base INTERFACE cpp-grpc-server cpp-string_utils-quote ydb-core-base + core-grpc_services-counters ydb-core-grpc_streaming api-protos cpp-client-resources diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index ca6362cca9..4bf9ed201a 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -17,6 +17,7 @@ #include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/library/yql/public/issue/yql_issue_manager.h> +#include <ydb/core/grpc_services/counters/proxy_counters.h> #include <ydb/core/grpc_streaming/grpc_streaming.h> #include <ydb/core/tx/scheme_board/events.h> #include <ydb/core/base/events.h> @@ -351,6 +352,8 @@ public: virtual bool Validate(TString& error) = 0; // counters + virtual void SetCounters(IGRpcProxyCounters::TPtr counters) = 0; + virtual IGRpcProxyCounters::TPtr GetCounters() const = 0; virtual void UseDatabase(const TString& database) = 0; // rate limiting @@ -519,6 +522,13 @@ public: return true; } + void SetCounters(IGRpcProxyCounters::TPtr) override { + } + + IGRpcProxyCounters::TPtr GetCounters() const override { + return nullptr; + } + void UseDatabase(const TString& database) override { Y_UNUSED(database); } @@ -704,6 +714,14 @@ public: return true; } + void SetCounters(IGRpcProxyCounters::TPtr counters) override { + Counters_ = counters; + } + + IGRpcProxyCounters::TPtr GetCounters() const override { + return Counters_; + } + void UseDatabase(const TString& database) override { Ctx_->UseDatabase(database); } @@ -766,6 +784,7 @@ private: NYql::TIssueManager IssueManager_; TMaybe<NRpcService::TRlPath> RlPath_; bool RlAllowed_; + IGRpcProxyCounters::TPtr Counters_; }; template <typename TDerived> @@ -946,6 +965,14 @@ public: return true; } + void SetCounters(IGRpcProxyCounters::TPtr counters) override { + Counters = counters; + } + + IGRpcProxyCounters::TPtr GetCounters() const override { + return Counters; + } + void UseDatabase(const TString& database) override { Ctx_->UseDatabase(database); } @@ -1109,6 +1136,7 @@ private: ui64 Ru = 0; TRespHook RespHook; TMaybe<NRpcService::TRlPath> RlPath; + IGRpcProxyCounters::TPtr Counters; }; template <ui32 TRpcId, typename TReq, typename TResp, bool IsOperation, typename TDerived> diff --git a/ydb/core/grpc_services/grpc_request_check_actor.h b/ydb/core/grpc_services/grpc_request_check_actor.h index c5bc34a83b..fe0902b4e4 100644 --- a/ydb/core/grpc_services/grpc_request_check_actor.h +++ b/ydb/core/grpc_services/grpc_request_check_actor.h @@ -103,6 +103,8 @@ public: Counters_ = WrapGRpcProxyDbCounters(Counters_); } + GrpcRequestBaseCtx_->SetCounters(Counters_); + { auto [error, issue] = CheckConnectRight(); if (error) { @@ -197,10 +199,6 @@ public: ReplyBackAndDie(); } - void SetRlPath(TMaybe<NRpcService::TRlPath>&& rlPath) { - GrpcRequestBaseCtx_->SetRlPath(std::move(rlPath)); - } - STATEFN(DbAccessStateFunc) { switch (ev->GetTypeRewrite()) { hFunc(TEvents::TEvPoisonPill, HandlePoison); @@ -301,7 +299,7 @@ private: return SetTokenAndDie(CheckedDatabaseName_); } else { auto actions = NRpcService::MakeRequests(*RlConfig, rlPath.GetRef()); - SetRlPath(std::move(rlPath)); + GrpcRequestBaseCtx_->SetRlPath(std::move(rlPath)); Ydb::RateLimiter::AcquireResourceRequest req; bool hasOnReqAction = false; diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index 5b114cf882..95b4206260 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -1809,7 +1809,12 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& e return RecheckACL(ctx); case EWakeupTag::RlAllowed: + if (auto counters = Request->GetCounters()) { + counters->AddConsumedRequestUnits(PendingQuota->RequiredQuota); + } + ProcessAnswer(PendingQuota, ctx); + if (!WaitingQuota.empty()) { PendingQuota = WaitingQuota.front(); WaitingQuota.pop_front(); diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index 44e7bd7610..93716bafd7 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -1503,6 +1503,10 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& return RecheckACL(ctx); case EWakeupTag::RlAllowed: + if (auto counters = Request->GetCounters()) { + counters->AddConsumedRequestUnits(PendingQuotaRequest->RequiredQuota); + } + if (SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) { SendRequest(std::move(PendingQuotaRequest), ctx); } else { |