aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@ydb.tech>2024-10-17 11:47:49 +0200
committerGitHub <noreply@github.com>2024-10-17 12:47:49 +0300
commitd0cfbf306137a4f30fbcfe98f9e270b060c57905 (patch)
tree50c91855c8a612265f5daf8991ce3b97ec40a254
parenta71cc84f0c2f4321af417de000484193208eec01 (diff)
downloadydb-d0cfbf306137a4f30fbcfe98f9e270b060c57905.tar.gz
[refactoring] Get rid of grpc global request limiter copy-paste code. (#10518)
-rw-r--r--ydb/core/client/server/grpc_server.h6
-rw-r--r--ydb/core/grpc_services/base/base_service.h15
-rw-r--r--ydb/core/grpc_streaming/grpc_streaming_ut.cpp9
-rw-r--r--ydb/core/http_proxy/grpc_service.cpp13
-rw-r--r--ydb/core/http_proxy/grpc_service.h5
-rw-r--r--ydb/library/grpc/server/grpc_request.h10
-rw-r--r--ydb/library/grpc/server/grpc_server.cpp14
-rw-r--r--ydb/library/grpc/server/grpc_server.h10
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_service.cpp13
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_service.h5
-rw-r--r--ydb/services/deprecated/persqueue_v0/persqueue.cpp12
-rw-r--r--ydb/services/deprecated/persqueue_v0/persqueue.h5
-rw-r--r--ydb/services/fq/grpc_service.cpp13
-rw-r--r--ydb/services/fq/grpc_service.h5
-rw-r--r--ydb/services/fq/private_grpc.cpp13
-rw-r--r--ydb/services/fq/private_grpc.h5
-rw-r--r--ydb/services/fq/ydb_over_fq.cpp13
-rw-r--r--ydb/services/fq/ydb_over_fq.h11
-rw-r--r--ydb/services/keyvalue/grpc_service.cpp12
-rw-r--r--ydb/services/keyvalue/grpc_service.h6
-rw-r--r--ydb/services/local_discovery/grpc_service.cpp13
-rw-r--r--ydb/services/local_discovery/grpc_service.h5
-rw-r--r--ydb/services/persqueue_cluster_discovery/grpc_service.h6
-rw-r--r--ydb/services/rate_limiter/grpc_service.cpp12
-rw-r--r--ydb/services/rate_limiter/grpc_service.h5
-rw-r--r--ydb/services/ydb/ydb_dummy.cpp13
-rw-r--r--ydb/services/ydb/ydb_dummy.h5
27 files changed, 38 insertions, 216 deletions
diff --git a/ydb/core/client/server/grpc_server.h b/ydb/core/client/server/grpc_server.h
index ece9eb54e1..b7bf2cd3ac 100644
--- a/ydb/core/client/server/grpc_server.h
+++ b/ydb/core/client/server/grpc_server.h
@@ -57,13 +57,13 @@ public:
TGRpcService();
void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override;
+ void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override final;
NThreading::TFuture<void> Prepare(NActors::TActorSystem* system, const NActors::TActorId& pqMeta, const NActors::TActorId& msgBusProxy, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters);
void Start();
- bool IncRequest();
- void DecRequest();
+ bool IncRequest() override final;
+ void DecRequest() override final;
i64 GetCurrentInFlight() const;
private:
diff --git a/ydb/core/grpc_services/base/base_service.h b/ydb/core/grpc_services/base/base_service.h
index 27f16fb5f0..bd5d7e0015 100644
--- a/ydb/core/grpc_services/base/base_service.h
+++ b/ydb/core/grpc_services/base/base_service.h
@@ -73,19 +73,6 @@ public:
SetupIncomingRequests(std::move(logger));
}
- void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override {
- Limiter_ = limiter;
- }
-
- bool IncRequest() {
- return Limiter_->Inc();
- }
-
- void DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
- }
-
protected:
virtual void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) = 0;
@@ -97,8 +84,6 @@ protected:
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
const NActors::TActorId GRpcRequestProxyId_;
const TVector<NActors::TActorId> GRpcProxies_;
-
- NYdbGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
}
diff --git a/ydb/core/grpc_streaming/grpc_streaming_ut.cpp b/ydb/core/grpc_streaming/grpc_streaming_ut.cpp
index d01f9b36d5..d6c723a9b2 100644
--- a/ydb/core/grpc_streaming/grpc_streaming_ut.cpp
+++ b/ydb/core/grpc_streaming/grpc_streaming_ut.cpp
@@ -64,15 +64,15 @@ public:
getCounterBlock("streaming", "Session"));
}
- void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override {
- Limiter = limiter;
+ void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter*) override {
+ // nothing
}
- bool IncRequest() {
+ bool IncRequest() override {
return true;
}
- void DecRequest() {
+ void DecRequest() override {
// nothing
}
@@ -81,7 +81,6 @@ private:
TIntrusivePtr<::NMonitoring::TDynamicCounters> const Counters;
grpc::ServerCompletionQueue* CQ = nullptr;
- NYdbGrpc::TGlobalLimiter* Limiter = nullptr;
};
template<class TImplActor>
diff --git a/ydb/core/http_proxy/grpc_service.cpp b/ydb/core/http_proxy/grpc_service.cpp
index 00f59ca06e..b310b55f15 100644
--- a/ydb/core/http_proxy/grpc_service.cpp
+++ b/ydb/core/http_proxy/grpc_service.cpp
@@ -148,19 +148,6 @@ void TGRpcDiscoveryService::InitService(grpc::ServerCompletionQueue *cq, NYdbGrp
SetupIncomingRequests(std::move(logger));
}
-void TGRpcDiscoveryService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter *limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcDiscoveryService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcDiscoveryService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
void TGRpcDiscoveryService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
auto getCounterBlock = NGRpcService::CreateCounterCb(Counters_, ActorSystem_);
#ifdef ADD_REQUEST
diff --git a/ydb/core/http_proxy/grpc_service.h b/ydb/core/http_proxy/grpc_service.h
index 8c246d380f..bc9ee0f726 100644
--- a/ydb/core/http_proxy/grpc_service.h
+++ b/ydb/core/http_proxy/grpc_service.h
@@ -20,10 +20,6 @@ public:
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters);
void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
private:
void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger);
@@ -33,7 +29,6 @@ private:
std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider_;
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
- NYdbGrpc::TGlobalLimiter* Limiter_;
};
} // namespace NKikimr
diff --git a/ydb/library/grpc/server/grpc_request.h b/ydb/library/grpc/server/grpc_request.h
index f120615583..b4c722e132 100644
--- a/ydb/library/grpc/server/grpc_request.h
+++ b/ydb/library/grpc/server/grpc_request.h
@@ -74,7 +74,7 @@ public:
, Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context))
, StateFunc_(&TThis::SetRequestDone)
, Request_(google::protobuf::Arena::CreateMessage<TIn>(&Arena_))
- , AuthState_(Server_->NeedAuth())
+ , AuthState_(server->NeedAuth())
{
Y_ABORT_UNLESS(Request_);
GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_);
@@ -101,7 +101,7 @@ public:
, StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context))
, StateFunc_(&TThis::SetRequestDone)
, Request_(google::protobuf::Arena::CreateMessage<TIn>(&Arena_))
- , AuthState_(Server_->NeedAuth())
+ , AuthState_(server->NeedAuth())
, StreamAdaptor_(CreateStreamAdaptor())
{
Y_ABORT_UNLESS(Request_);
@@ -251,10 +251,10 @@ private:
if (!Server_->IsShuttingDown()) {
if (RequestCallback_) {
MakeIntrusive<TThis>(
- Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
+ static_cast<TService*>(Server_), this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
} else {
MakeIntrusive<TThis>(
- Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
+ static_cast<TService*>(Server_), this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
}
}
}
@@ -543,7 +543,7 @@ private:
}
using TStateFunc = bool (TThis::*)(bool);
- TService* Server_ = nullptr;
+ TGrpcServiceProtectiable* Server_ = nullptr;
TOnRequest Cb_;
TRequestCallback RequestCallback_;
TStreamRequestCallback StreamRequestCallback_;
diff --git a/ydb/library/grpc/server/grpc_server.cpp b/ydb/library/grpc/server/grpc_server.cpp
index d14fc169c4..cb8432563f 100644
--- a/ydb/library/grpc/server/grpc_server.cpp
+++ b/ydb/library/grpc/server/grpc_server.cpp
@@ -89,6 +89,20 @@ void TGrpcServiceProtectiable::DeregisterRequestCtx(ICancelableContext* req) {
}
}
+bool TGrpcServiceProtectiable::IncRequest() {
+ if (Limiter_) {
+ return Limiter_->Inc();
+ }
+ return true;
+}
+
+void TGrpcServiceProtectiable::DecRequest() {
+ if (Limiter_) {
+ Limiter_->Dec();
+ Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
+ }
+}
+
TGRpcServer::TGRpcServer(const TServerOptions& opts)
: Options_(opts)
, Limiter_(Options_.MaxGlobalRequestInFlight)
diff --git a/ydb/library/grpc/server/grpc_server.h b/ydb/library/grpc/server/grpc_server.h
index 3e97ef56ab..5ab48d2f24 100644
--- a/ydb/library/grpc/server/grpc_server.h
+++ b/ydb/library/grpc/server/grpc_server.h
@@ -262,13 +262,19 @@ public:
};
public:
- void SetGlobalLimiterHandle(TGlobalLimiter* /*limiter*/) override {}
+ void SetGlobalLimiterHandle(TGlobalLimiter* limiter) override {
+ Limiter_ = limiter;
+ }
+
void StopService() noexcept override;
size_t RequestsInProgress() const override;
bool RegisterRequestCtx(ICancelableContext* req);
void DeregisterRequestCtx(ICancelableContext* req);
+ virtual bool IncRequest();
+ virtual void DecRequest();
+
TShutdownGuard ProtectShutdown() noexcept {
AtomicIncrement(GuardCount_);
if (IsShuttingDown()) {
@@ -322,6 +328,8 @@ private:
// Note: benchmarks showed 4 shards is enough to scale to ~30 threads
TVector<TShard> Shards_{ size_t(4) };
std::atomic<size_t> NextShard_{ 0 };
+
+ NYdbGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
template<typename T>
diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp
index 191c130937..57b3f05f54 100644
--- a/ydb/library/yql/providers/dq/service/grpc_service.cpp
+++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp
@@ -849,19 +849,6 @@ namespace NYql::NDqs {
*/
}
- void TDqsGrpcService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) {
- Limiter = limiter;
- }
-
- bool TDqsGrpcService::IncRequest() {
- return Limiter->Inc();
- }
-
- void TDqsGrpcService::DecRequest() {
- Limiter->Dec();
- Y_ASSERT(Limiter->GetCurrentInFlight() >= 0);
- }
-
TFuture<void> TDqsGrpcService::Stop() {
TGuard<TMutex> lock(Mutex);
Stopping = true;
diff --git a/ydb/library/yql/providers/dq/service/grpc_service.h b/ydb/library/yql/providers/dq/service/grpc_service.h
index b9b547ac89..7e4682af62 100644
--- a/ydb/library/yql/providers/dq/service/grpc_service.h
+++ b/ydb/library/yql/providers/dq/service/grpc_service.h
@@ -28,17 +28,12 @@ namespace NYql::NDqs {
const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories);
void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
NThreading::TFuture<void> Stop();
private:
NActors::TActorSystem& ActorSystem;
grpc::ServerCompletionQueue* CQ = nullptr;
- NYdbGrpc::TGlobalLimiter* Limiter = nullptr;
TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
TDqTaskPreprocessorFactoryCollection DqTaskPreprocessorFactories;
diff --git a/ydb/services/deprecated/persqueue_v0/persqueue.cpp b/ydb/services/deprecated/persqueue_v0/persqueue.cpp
index 4d3b4a0539..4f67462831 100644
--- a/ydb/services/deprecated/persqueue_v0/persqueue.cpp
+++ b/ydb/services/deprecated/persqueue_v0/persqueue.cpp
@@ -42,18 +42,6 @@ void TGRpcPersQueueService::InitService(grpc::ServerCompletionQueue *cq, NYdbGrp
}
}
-void TGRpcPersQueueService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) {
- Limiter = limiter;
-}
-
-bool TGRpcPersQueueService::IncRequest() {
- return Limiter->Inc();
-}
-
-void TGRpcPersQueueService::DecRequest() {
- Limiter->Dec();
-}
-
void TGRpcPersQueueService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr /*logger*/) {
WriteService->SetupIncomingRequests();
ReadService->SetupIncomingRequests();
diff --git a/ydb/services/deprecated/persqueue_v0/persqueue.h b/ydb/services/deprecated/persqueue_v0/persqueue.h
index 7d79aa6f3d..158a0e6c72 100644
--- a/ydb/services/deprecated/persqueue_v0/persqueue.h
+++ b/ydb/services/deprecated/persqueue_v0/persqueue.h
@@ -27,14 +27,10 @@ public:
NYdbGrpc::TLoggerPtr logger,
size_t index) override;
void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override;
void StopService() noexcept override;
using NYdbGrpc::TGrpcServiceBase<NPersQueue::PersQueueService>::GetService;
- bool IncRequest();
- void DecRequest();
-
private:
void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger);
@@ -43,7 +39,6 @@ private:
grpc::ServerCompletionQueue* CQ = nullptr;
TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
- NYdbGrpc::TGlobalLimiter* Limiter = nullptr;
NActors::TActorId SchemeCache;
std::shared_ptr<NGRpcProxy::TPQWriteService> WriteService;
diff --git a/ydb/services/fq/grpc_service.cpp b/ydb/services/fq/grpc_service.cpp
index 245fdc02e3..791b21314c 100644
--- a/ydb/services/fq/grpc_service.cpp
+++ b/ydb/services/fq/grpc_service.cpp
@@ -19,19 +19,6 @@ void TGRpcFederatedQueryService::InitService(grpc::ServerCompletionQueue *cq, NY
SetupIncomingRequests(std::move(logger));
}
-void TGRpcFederatedQueryService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcFederatedQueryService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcFederatedQueryService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
void TGRpcFederatedQueryService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
#ifdef ADD_REQUEST
diff --git a/ydb/services/fq/grpc_service.h b/ydb/services/fq/grpc_service.h
index 4e155f3fd3..a4e5bdcc82 100644
--- a/ydb/services/fq/grpc_service.h
+++ b/ydb/services/fq/grpc_service.h
@@ -17,10 +17,6 @@ public:
NActors::TActorId id);
void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
private:
void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger);
@@ -29,7 +25,6 @@ private:
TIntrusivePtr<NMonitoring::TDynamicCounters> Counters_;
NActors::TActorId GRpcRequestProxyId_;
- NYdbGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
} // namespace NGRpcService
diff --git a/ydb/services/fq/private_grpc.cpp b/ydb/services/fq/private_grpc.cpp
index 2faf9e9e16..46df839d6b 100644
--- a/ydb/services/fq/private_grpc.cpp
+++ b/ydb/services/fq/private_grpc.cpp
@@ -19,19 +19,6 @@ void TGRpcFqPrivateTaskService::InitService(grpc::ServerCompletionQueue* cq, NYd
SetupIncomingRequests(std::move(logger));
}
-void TGRpcFqPrivateTaskService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcFqPrivateTaskService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcFqPrivateTaskService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
void TGRpcFqPrivateTaskService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
diff --git a/ydb/services/fq/private_grpc.h b/ydb/services/fq/private_grpc.h
index 03326196e2..d7f3b7fdb8 100644
--- a/ydb/services/fq/private_grpc.h
+++ b/ydb/services/fq/private_grpc.h
@@ -15,10 +15,6 @@ public:
NActors::TActorId id);
void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
private:
void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger);
@@ -27,7 +23,6 @@ private:
TIntrusivePtr<NMonitoring::TDynamicCounters> Counters_;
NActors::TActorId GRpcRequestProxyId_;
- NYdbGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
} // namespace NGRpcService
diff --git a/ydb/services/fq/ydb_over_fq.cpp b/ydb/services/fq/ydb_over_fq.cpp
index b0783f76e5..88555abfe7 100644
--- a/ydb/services/fq/ydb_over_fq.cpp
+++ b/ydb/services/fq/ydb_over_fq.cpp
@@ -19,19 +19,6 @@ void TGRpcYdbOverFqService::InitService(grpc::ServerCompletionQueue *cq, NYdbGrp
SetupIncomingRequests(std::move(logger));
}
-void TGRpcYdbOverFqService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcYdbOverFqService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcYdbOverFqService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
void TGrpcTableOverFqService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
#if defined(ADD_REQUEST) or defined (ADD_REQUEST_IMPL)
diff --git a/ydb/services/fq/ydb_over_fq.h b/ydb/services/fq/ydb_over_fq.h
index d4f42a4623..67ff7a0c88 100644
--- a/ydb/services/fq/ydb_over_fq.h
+++ b/ydb/services/fq/ydb_over_fq.h
@@ -18,10 +18,6 @@ public:
NActors::TActorId id);
void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger);
- void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter);
-
- bool IncRequest();
- void DecRequest();
protected:
virtual void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) = 0;
@@ -30,7 +26,6 @@ protected:
TIntrusivePtr<NMonitoring::TDynamicCounters> Counters_;
NActors::TActorId GRpcRequestProxyId_;
- NYdbGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
class TGrpcTableOverFqService
@@ -43,9 +38,6 @@ public:
void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override {
TBase::InitService(cq, std::move(logger));
}
- void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override {
- TBase::SetGlobalLimiterHandle(limiter);
- }
private:
void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) override;
@@ -61,9 +53,6 @@ public:
void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override {
TBase::InitService(cq, std::move(logger));
}
- void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override {
- TBase::SetGlobalLimiterHandle(limiter);
- }
private:
void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) override;
diff --git a/ydb/services/keyvalue/grpc_service.cpp b/ydb/services/keyvalue/grpc_service.cpp
index 4cd1cc62ea..d6bfd2104f 100644
--- a/ydb/services/keyvalue/grpc_service.cpp
+++ b/ydb/services/keyvalue/grpc_service.cpp
@@ -22,18 +22,6 @@ void TKeyValueGRpcService::InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc
SetupIncomingRequests(std::move(logger));
}
-void TKeyValueGRpcService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) {
- Limiter = limiter;
-}
-
-bool TKeyValueGRpcService::IncRequest() {
- return Limiter->Inc();
-}
-
-void TKeyValueGRpcService::DecRequest() {
- Limiter->Dec();
-}
-
void TKeyValueGRpcService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
auto getCounterBlock = NGRpcService::CreateCounterCb(Counters, ActorSystem);
diff --git a/ydb/services/keyvalue/grpc_service.h b/ydb/services/keyvalue/grpc_service.h
index fdf1bf0890..aa255ae6df 100644
--- a/ydb/services/keyvalue/grpc_service.h
+++ b/ydb/services/keyvalue/grpc_service.h
@@ -17,11 +17,6 @@ public:
~TKeyValueGRpcService();
void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
-
private:
void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger);
@@ -31,7 +26,6 @@ private:
NActors::TActorId GRpcRequestProxyId;
grpc::ServerCompletionQueue* CQ = nullptr;
- NYdbGrpc::TGlobalLimiter* Limiter = nullptr;
};
} // namespace NKikimr::NGRpcService
diff --git a/ydb/services/local_discovery/grpc_service.cpp b/ydb/services/local_discovery/grpc_service.cpp
index 3ea90d8b21..1d91312049 100644
--- a/ydb/services/local_discovery/grpc_service.cpp
+++ b/ydb/services/local_discovery/grpc_service.cpp
@@ -63,19 +63,6 @@ void TGRpcLocalDiscoveryService::InitService(grpc::ServerCompletionQueue *cq, NY
SetupIncomingRequests(std::move(logger));
}
-void TGRpcLocalDiscoveryService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter *limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcLocalDiscoveryService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcLocalDiscoveryService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
void TGRpcLocalDiscoveryService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
using namespace Ydb;
diff --git a/ydb/services/local_discovery/grpc_service.h b/ydb/services/local_discovery/grpc_service.h
index 57a487d6cc..cd83bf7188 100644
--- a/ydb/services/local_discovery/grpc_service.h
+++ b/ydb/services/local_discovery/grpc_service.h
@@ -24,10 +24,6 @@ public:
NActors::TActorId id);
void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
private:
void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger);
@@ -39,7 +35,6 @@ private:
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
NActors::TActorId GRpcRequestProxyId_;
- NYdbGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
} // namespace NGRpcService
diff --git a/ydb/services/persqueue_cluster_discovery/grpc_service.h b/ydb/services/persqueue_cluster_discovery/grpc_service.h
index de54280552..9a2f7d9067 100644
--- a/ydb/services/persqueue_cluster_discovery/grpc_service.h
+++ b/ydb/services/persqueue_cluster_discovery/grpc_service.h
@@ -13,9 +13,9 @@ public:
NActors::TActorId id, const TMaybe<ui64>& requestsInflightLimit = Nothing());
void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override;
- bool IncRequest();
- void DecRequest();
+ void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override final;
+ bool IncRequest() override final;
+ void DecRequest() override final;
void StopService() noexcept override;
using NYdbGrpc::TGrpcServiceBase<Ydb::PersQueue::V1::ClusterDiscoveryService>::GetService;
diff --git a/ydb/services/rate_limiter/grpc_service.cpp b/ydb/services/rate_limiter/grpc_service.cpp
index 456f00e948..30b566f9be 100644
--- a/ydb/services/rate_limiter/grpc_service.cpp
+++ b/ydb/services/rate_limiter/grpc_service.cpp
@@ -20,18 +20,6 @@ void TRateLimiterGRpcService::InitService(grpc::ServerCompletionQueue* cq, NYdbG
SetupIncomingRequests(std::move(logger));
}
-void TRateLimiterGRpcService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) {
- Limiter = limiter;
-}
-
-bool TRateLimiterGRpcService::IncRequest() {
- return Limiter->Inc();
-}
-
-void TRateLimiterGRpcService::DecRequest() {
- Limiter->Dec();
-}
-
void TRateLimiterGRpcService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
auto getCounterBlock = NGRpcService::CreateCounterCb(Counters, ActorSystem);
using namespace NGRpcService;
diff --git a/ydb/services/rate_limiter/grpc_service.h b/ydb/services/rate_limiter/grpc_service.h
index 2464af4de4..2900dca791 100644
--- a/ydb/services/rate_limiter/grpc_service.h
+++ b/ydb/services/rate_limiter/grpc_service.h
@@ -14,10 +14,6 @@ public:
~TRateLimiterGRpcService();
void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
private:
void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger);
@@ -28,7 +24,6 @@ private:
NActors::TActorId GRpcRequestProxyId;
grpc::ServerCompletionQueue* CQ = nullptr;
- NYdbGrpc::TGlobalLimiter* Limiter = nullptr;
};
} // namespace NKikimr::NQuoter
diff --git a/ydb/services/ydb/ydb_dummy.cpp b/ydb/services/ydb/ydb_dummy.cpp
index 88943b6f0a..de12795e77 100644
--- a/ydb/services/ydb/ydb_dummy.cpp
+++ b/ydb/services/ydb/ydb_dummy.cpp
@@ -122,19 +122,6 @@ void TGRpcYdbDummyService::InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc
SetupIncomingRequests(std::move(logger));
}
-void TGRpcYdbDummyService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcYdbDummyService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcYdbDummyService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
void TGRpcYdbDummyService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
diff --git a/ydb/services/ydb/ydb_dummy.h b/ydb/services/ydb/ydb_dummy.h
index 3619d24fa7..f398f87ca3 100644
--- a/ydb/services/ydb/ydb_dummy.h
+++ b/ydb/services/ydb/ydb_dummy.h
@@ -15,10 +15,6 @@ public:
TGRpcYdbDummyService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId proxyActorId);
void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
private:
void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger);
@@ -28,7 +24,6 @@ private:
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
NActors::TActorId GRpcRequestProxyId_;
- NYdbGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
}