aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2022-10-06 18:59:21 +0300
committerilnaz <ilnaz@ydb.tech>2022-10-06 18:59:21 +0300
commit68fd753127853234cd0812750f9bd10c81ab5fa9 (patch)
tree9d665821850ea8108f34c5d0560c6c21e5d88e53
parent3a5282585871fdeaaa5ce17b68da46f9f9597233 (diff)
downloadydb-68fd753127853234cd0812750f9bd10c81ab5fa9.tar.gz
Account RUs consumed by TopicService in common RUs counter
-rw-r--r--ydb/core/grpc_services/base/CMakeLists.txt1
-rw-r--r--ydb/core/grpc_services/base/base.h28
-rw-r--r--ydb/core/grpc_services/grpc_request_check_actor.h8
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp5
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp4
5 files changed, 41 insertions, 5 deletions
diff --git a/ydb/core/grpc_services/base/CMakeLists.txt b/ydb/core/grpc_services/base/CMakeLists.txt
index 80c3f43f25..0aaf328e0d 100644
--- a/ydb/core/grpc_services/base/CMakeLists.txt
+++ b/ydb/core/grpc_services/base/CMakeLists.txt
@@ -14,6 +14,7 @@ target_link_libraries(core-grpc_services-base INTERFACE
cpp-grpc-server
cpp-string_utils-quote
ydb-core-base
+ core-grpc_services-counters
ydb-core-grpc_streaming
api-protos
cpp-client-resources
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index ca6362cca9..4bf9ed201a 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -17,6 +17,7 @@
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/library/yql/public/issue/yql_issue_manager.h>
+#include <ydb/core/grpc_services/counters/proxy_counters.h>
#include <ydb/core/grpc_streaming/grpc_streaming.h>
#include <ydb/core/tx/scheme_board/events.h>
#include <ydb/core/base/events.h>
@@ -351,6 +352,8 @@ public:
virtual bool Validate(TString& error) = 0;
// counters
+ virtual void SetCounters(IGRpcProxyCounters::TPtr counters) = 0;
+ virtual IGRpcProxyCounters::TPtr GetCounters() const = 0;
virtual void UseDatabase(const TString& database) = 0;
// rate limiting
@@ -519,6 +522,13 @@ public:
return true;
}
+ void SetCounters(IGRpcProxyCounters::TPtr) override {
+ }
+
+ IGRpcProxyCounters::TPtr GetCounters() const override {
+ return nullptr;
+ }
+
void UseDatabase(const TString& database) override {
Y_UNUSED(database);
}
@@ -704,6 +714,14 @@ public:
return true;
}
+ void SetCounters(IGRpcProxyCounters::TPtr counters) override {
+ Counters_ = counters;
+ }
+
+ IGRpcProxyCounters::TPtr GetCounters() const override {
+ return Counters_;
+ }
+
void UseDatabase(const TString& database) override {
Ctx_->UseDatabase(database);
}
@@ -766,6 +784,7 @@ private:
NYql::TIssueManager IssueManager_;
TMaybe<NRpcService::TRlPath> RlPath_;
bool RlAllowed_;
+ IGRpcProxyCounters::TPtr Counters_;
};
template <typename TDerived>
@@ -946,6 +965,14 @@ public:
return true;
}
+ void SetCounters(IGRpcProxyCounters::TPtr counters) override {
+ Counters = counters;
+ }
+
+ IGRpcProxyCounters::TPtr GetCounters() const override {
+ return Counters;
+ }
+
void UseDatabase(const TString& database) override {
Ctx_->UseDatabase(database);
}
@@ -1109,6 +1136,7 @@ private:
ui64 Ru = 0;
TRespHook RespHook;
TMaybe<NRpcService::TRlPath> RlPath;
+ IGRpcProxyCounters::TPtr Counters;
};
template <ui32 TRpcId, typename TReq, typename TResp, bool IsOperation, typename TDerived>
diff --git a/ydb/core/grpc_services/grpc_request_check_actor.h b/ydb/core/grpc_services/grpc_request_check_actor.h
index c5bc34a83b..fe0902b4e4 100644
--- a/ydb/core/grpc_services/grpc_request_check_actor.h
+++ b/ydb/core/grpc_services/grpc_request_check_actor.h
@@ -103,6 +103,8 @@ public:
Counters_ = WrapGRpcProxyDbCounters(Counters_);
}
+ GrpcRequestBaseCtx_->SetCounters(Counters_);
+
{
auto [error, issue] = CheckConnectRight();
if (error) {
@@ -197,10 +199,6 @@ public:
ReplyBackAndDie();
}
- void SetRlPath(TMaybe<NRpcService::TRlPath>&& rlPath) {
- GrpcRequestBaseCtx_->SetRlPath(std::move(rlPath));
- }
-
STATEFN(DbAccessStateFunc) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvents::TEvPoisonPill, HandlePoison);
@@ -301,7 +299,7 @@ private:
return SetTokenAndDie(CheckedDatabaseName_);
} else {
auto actions = NRpcService::MakeRequests(*RlConfig, rlPath.GetRef());
- SetRlPath(std::move(rlPath));
+ GrpcRequestBaseCtx_->SetRlPath(std::move(rlPath));
Ydb::RateLimiter::AcquireResourceRequest req;
bool hasOnReqAction = false;
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index 5b114cf882..95b4206260 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -1809,7 +1809,12 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& e
return RecheckACL(ctx);
case EWakeupTag::RlAllowed:
+ if (auto counters = Request->GetCounters()) {
+ counters->AddConsumedRequestUnits(PendingQuota->RequiredQuota);
+ }
+
ProcessAnswer(PendingQuota, ctx);
+
if (!WaitingQuota.empty()) {
PendingQuota = WaitingQuota.front();
WaitingQuota.pop_front();
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index 44e7bd7610..93716bafd7 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -1503,6 +1503,10 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr&
return RecheckACL(ctx);
case EWakeupTag::RlAllowed:
+ if (auto counters = Request->GetCounters()) {
+ counters->AddConsumedRequestUnits(PendingQuotaRequest->RequiredQuota);
+ }
+
if (SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) {
SendRequest(std::move(PendingQuotaRequest), ctx);
} else {