diff options
author | Daniil Cherednik <dcherednik@ydb.tech> | 2024-10-17 11:47:49 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-17 12:47:49 +0300 |
commit | d0cfbf306137a4f30fbcfe98f9e270b060c57905 (patch) | |
tree | 50c91855c8a612265f5daf8991ce3b97ec40a254 | |
parent | a71cc84f0c2f4321af417de000484193208eec01 (diff) | |
download | ydb-d0cfbf306137a4f30fbcfe98f9e270b060c57905.tar.gz |
[refactoring] Get rid of grpc global request limiter copy-paste code. (#10518)
27 files changed, 38 insertions, 216 deletions
diff --git a/ydb/core/client/server/grpc_server.h b/ydb/core/client/server/grpc_server.h index ece9eb54e1..b7bf2cd3ac 100644 --- a/ydb/core/client/server/grpc_server.h +++ b/ydb/core/client/server/grpc_server.h @@ -57,13 +57,13 @@ public: TGRpcService(); void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override; + void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override final; NThreading::TFuture<void> Prepare(NActors::TActorSystem* system, const NActors::TActorId& pqMeta, const NActors::TActorId& msgBusProxy, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters); void Start(); - bool IncRequest(); - void DecRequest(); + bool IncRequest() override final; + void DecRequest() override final; i64 GetCurrentInFlight() const; private: diff --git a/ydb/core/grpc_services/base/base_service.h b/ydb/core/grpc_services/base/base_service.h index 27f16fb5f0..bd5d7e0015 100644 --- a/ydb/core/grpc_services/base/base_service.h +++ b/ydb/core/grpc_services/base/base_service.h @@ -73,19 +73,6 @@ public: SetupIncomingRequests(std::move(logger)); } - void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override { - Limiter_ = limiter; - } - - bool IncRequest() { - return Limiter_->Inc(); - } - - void DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); - } - protected: virtual void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) = 0; @@ -97,8 +84,6 @@ protected: TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; const NActors::TActorId GRpcRequestProxyId_; const TVector<NActors::TActorId> GRpcProxies_; - - NYdbGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } diff --git a/ydb/core/grpc_streaming/grpc_streaming_ut.cpp b/ydb/core/grpc_streaming/grpc_streaming_ut.cpp index d01f9b36d5..d6c723a9b2 100644 --- a/ydb/core/grpc_streaming/grpc_streaming_ut.cpp +++ b/ydb/core/grpc_streaming/grpc_streaming_ut.cpp @@ -64,15 +64,15 @@ public: getCounterBlock("streaming", "Session")); } - void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override { - Limiter = limiter; + void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter*) override { + // nothing } - bool IncRequest() { + bool IncRequest() override { return true; } - void DecRequest() { + void DecRequest() override { // nothing } @@ -81,7 +81,6 @@ private: TIntrusivePtr<::NMonitoring::TDynamicCounters> const Counters; grpc::ServerCompletionQueue* CQ = nullptr; - NYdbGrpc::TGlobalLimiter* Limiter = nullptr; }; template<class TImplActor> diff --git a/ydb/core/http_proxy/grpc_service.cpp b/ydb/core/http_proxy/grpc_service.cpp index 00f59ca06e..b310b55f15 100644 --- a/ydb/core/http_proxy/grpc_service.cpp +++ b/ydb/core/http_proxy/grpc_service.cpp @@ -148,19 +148,6 @@ void TGRpcDiscoveryService::InitService(grpc::ServerCompletionQueue *cq, NYdbGrp SetupIncomingRequests(std::move(logger)); } -void TGRpcDiscoveryService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter *limiter) { - Limiter_ = limiter; -} - -bool TGRpcDiscoveryService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcDiscoveryService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - void TGRpcDiscoveryService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { auto getCounterBlock = NGRpcService::CreateCounterCb(Counters_, ActorSystem_); #ifdef ADD_REQUEST diff --git a/ydb/core/http_proxy/grpc_service.h b/ydb/core/http_proxy/grpc_service.h index 8c246d380f..bc9ee0f726 100644 --- a/ydb/core/http_proxy/grpc_service.h +++ b/ydb/core/http_proxy/grpc_service.h @@ -20,10 +20,6 @@ public: TIntrusivePtr<::NMonitoring::TDynamicCounters> counters); void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); private: void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger); @@ -33,7 +29,6 @@ private: std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider_; TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; - NYdbGrpc::TGlobalLimiter* Limiter_; }; } // namespace NKikimr diff --git a/ydb/library/grpc/server/grpc_request.h b/ydb/library/grpc/server/grpc_request.h index f120615583..b4c722e132 100644 --- a/ydb/library/grpc/server/grpc_request.h +++ b/ydb/library/grpc/server/grpc_request.h @@ -74,7 +74,7 @@ public: , Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context)) , StateFunc_(&TThis::SetRequestDone) , Request_(google::protobuf::Arena::CreateMessage<TIn>(&Arena_)) - , AuthState_(Server_->NeedAuth()) + , AuthState_(server->NeedAuth()) { Y_ABORT_UNLESS(Request_); GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_); @@ -101,7 +101,7 @@ public: , StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context)) , StateFunc_(&TThis::SetRequestDone) , Request_(google::protobuf::Arena::CreateMessage<TIn>(&Arena_)) - , AuthState_(Server_->NeedAuth()) + , AuthState_(server->NeedAuth()) , StreamAdaptor_(CreateStreamAdaptor()) { Y_ABORT_UNLESS(Request_); @@ -251,10 +251,10 @@ private: if (!Server_->IsShuttingDown()) { if (RequestCallback_) { MakeIntrusive<TThis>( - Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); + static_cast<TService*>(Server_), this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); } else { MakeIntrusive<TThis>( - Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); + static_cast<TService*>(Server_), this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); } } } @@ -543,7 +543,7 @@ private: } using TStateFunc = bool (TThis::*)(bool); - TService* Server_ = nullptr; + TGrpcServiceProtectiable* Server_ = nullptr; TOnRequest Cb_; TRequestCallback RequestCallback_; TStreamRequestCallback StreamRequestCallback_; diff --git a/ydb/library/grpc/server/grpc_server.cpp b/ydb/library/grpc/server/grpc_server.cpp index d14fc169c4..cb8432563f 100644 --- a/ydb/library/grpc/server/grpc_server.cpp +++ b/ydb/library/grpc/server/grpc_server.cpp @@ -89,6 +89,20 @@ void TGrpcServiceProtectiable::DeregisterRequestCtx(ICancelableContext* req) { } } +bool TGrpcServiceProtectiable::IncRequest() { + if (Limiter_) { + return Limiter_->Inc(); + } + return true; +} + +void TGrpcServiceProtectiable::DecRequest() { + if (Limiter_) { + Limiter_->Dec(); + Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); + } +} + TGRpcServer::TGRpcServer(const TServerOptions& opts) : Options_(opts) , Limiter_(Options_.MaxGlobalRequestInFlight) diff --git a/ydb/library/grpc/server/grpc_server.h b/ydb/library/grpc/server/grpc_server.h index 3e97ef56ab..5ab48d2f24 100644 --- a/ydb/library/grpc/server/grpc_server.h +++ b/ydb/library/grpc/server/grpc_server.h @@ -262,13 +262,19 @@ public: }; public: - void SetGlobalLimiterHandle(TGlobalLimiter* /*limiter*/) override {} + void SetGlobalLimiterHandle(TGlobalLimiter* limiter) override { + Limiter_ = limiter; + } + void StopService() noexcept override; size_t RequestsInProgress() const override; bool RegisterRequestCtx(ICancelableContext* req); void DeregisterRequestCtx(ICancelableContext* req); + virtual bool IncRequest(); + virtual void DecRequest(); + TShutdownGuard ProtectShutdown() noexcept { AtomicIncrement(GuardCount_); if (IsShuttingDown()) { @@ -322,6 +328,8 @@ private: // Note: benchmarks showed 4 shards is enough to scale to ~30 threads TVector<TShard> Shards_{ size_t(4) }; std::atomic<size_t> NextShard_{ 0 }; + + NYdbGrpc::TGlobalLimiter* Limiter_ = nullptr; }; template<typename T> diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp index 191c130937..57b3f05f54 100644 --- a/ydb/library/yql/providers/dq/service/grpc_service.cpp +++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp @@ -849,19 +849,6 @@ namespace NYql::NDqs { */ } - void TDqsGrpcService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) { - Limiter = limiter; - } - - bool TDqsGrpcService::IncRequest() { - return Limiter->Inc(); - } - - void TDqsGrpcService::DecRequest() { - Limiter->Dec(); - Y_ASSERT(Limiter->GetCurrentInFlight() >= 0); - } - TFuture<void> TDqsGrpcService::Stop() { TGuard<TMutex> lock(Mutex); Stopping = true; diff --git a/ydb/library/yql/providers/dq/service/grpc_service.h b/ydb/library/yql/providers/dq/service/grpc_service.h index b9b547ac89..7e4682af62 100644 --- a/ydb/library/yql/providers/dq/service/grpc_service.h +++ b/ydb/library/yql/providers/dq/service/grpc_service.h @@ -28,17 +28,12 @@ namespace NYql::NDqs { const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories); void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); NThreading::TFuture<void> Stop(); private: NActors::TActorSystem& ActorSystem; grpc::ServerCompletionQueue* CQ = nullptr; - NYdbGrpc::TGlobalLimiter* Limiter = nullptr; TIntrusivePtr<NMonitoring::TDynamicCounters> Counters; TDqTaskPreprocessorFactoryCollection DqTaskPreprocessorFactories; diff --git a/ydb/services/deprecated/persqueue_v0/persqueue.cpp b/ydb/services/deprecated/persqueue_v0/persqueue.cpp index 4d3b4a0539..4f67462831 100644 --- a/ydb/services/deprecated/persqueue_v0/persqueue.cpp +++ b/ydb/services/deprecated/persqueue_v0/persqueue.cpp @@ -42,18 +42,6 @@ void TGRpcPersQueueService::InitService(grpc::ServerCompletionQueue *cq, NYdbGrp } } -void TGRpcPersQueueService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) { - Limiter = limiter; -} - -bool TGRpcPersQueueService::IncRequest() { - return Limiter->Inc(); -} - -void TGRpcPersQueueService::DecRequest() { - Limiter->Dec(); -} - void TGRpcPersQueueService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr /*logger*/) { WriteService->SetupIncomingRequests(); ReadService->SetupIncomingRequests(); diff --git a/ydb/services/deprecated/persqueue_v0/persqueue.h b/ydb/services/deprecated/persqueue_v0/persqueue.h index 7d79aa6f3d..158a0e6c72 100644 --- a/ydb/services/deprecated/persqueue_v0/persqueue.h +++ b/ydb/services/deprecated/persqueue_v0/persqueue.h @@ -27,14 +27,10 @@ public: NYdbGrpc::TLoggerPtr logger, size_t index) override; void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override; void StopService() noexcept override; using NYdbGrpc::TGrpcServiceBase<NPersQueue::PersQueueService>::GetService; - bool IncRequest(); - void DecRequest(); - private: void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger); @@ -43,7 +39,6 @@ private: grpc::ServerCompletionQueue* CQ = nullptr; TIntrusivePtr<NMonitoring::TDynamicCounters> Counters; - NYdbGrpc::TGlobalLimiter* Limiter = nullptr; NActors::TActorId SchemeCache; std::shared_ptr<NGRpcProxy::TPQWriteService> WriteService; diff --git a/ydb/services/fq/grpc_service.cpp b/ydb/services/fq/grpc_service.cpp index 245fdc02e3..791b21314c 100644 --- a/ydb/services/fq/grpc_service.cpp +++ b/ydb/services/fq/grpc_service.cpp @@ -19,19 +19,6 @@ void TGRpcFederatedQueryService::InitService(grpc::ServerCompletionQueue *cq, NY SetupIncomingRequests(std::move(logger)); } -void TGRpcFederatedQueryService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) { - Limiter_ = limiter; -} - -bool TGRpcFederatedQueryService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcFederatedQueryService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - void TGRpcFederatedQueryService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); #ifdef ADD_REQUEST diff --git a/ydb/services/fq/grpc_service.h b/ydb/services/fq/grpc_service.h index 4e155f3fd3..a4e5bdcc82 100644 --- a/ydb/services/fq/grpc_service.h +++ b/ydb/services/fq/grpc_service.h @@ -17,10 +17,6 @@ public: NActors::TActorId id); void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); private: void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger); @@ -29,7 +25,6 @@ private: TIntrusivePtr<NMonitoring::TDynamicCounters> Counters_; NActors::TActorId GRpcRequestProxyId_; - NYdbGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } // namespace NGRpcService diff --git a/ydb/services/fq/private_grpc.cpp b/ydb/services/fq/private_grpc.cpp index 2faf9e9e16..46df839d6b 100644 --- a/ydb/services/fq/private_grpc.cpp +++ b/ydb/services/fq/private_grpc.cpp @@ -19,19 +19,6 @@ void TGRpcFqPrivateTaskService::InitService(grpc::ServerCompletionQueue* cq, NYd SetupIncomingRequests(std::move(logger)); } -void TGRpcFqPrivateTaskService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) { - Limiter_ = limiter; -} - -bool TGRpcFqPrivateTaskService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcFqPrivateTaskService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - void TGRpcFqPrivateTaskService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); diff --git a/ydb/services/fq/private_grpc.h b/ydb/services/fq/private_grpc.h index 03326196e2..d7f3b7fdb8 100644 --- a/ydb/services/fq/private_grpc.h +++ b/ydb/services/fq/private_grpc.h @@ -15,10 +15,6 @@ public: NActors::TActorId id); void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); private: void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger); @@ -27,7 +23,6 @@ private: TIntrusivePtr<NMonitoring::TDynamicCounters> Counters_; NActors::TActorId GRpcRequestProxyId_; - NYdbGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } // namespace NGRpcService diff --git a/ydb/services/fq/ydb_over_fq.cpp b/ydb/services/fq/ydb_over_fq.cpp index b0783f76e5..88555abfe7 100644 --- a/ydb/services/fq/ydb_over_fq.cpp +++ b/ydb/services/fq/ydb_over_fq.cpp @@ -19,19 +19,6 @@ void TGRpcYdbOverFqService::InitService(grpc::ServerCompletionQueue *cq, NYdbGrp SetupIncomingRequests(std::move(logger)); } -void TGRpcYdbOverFqService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) { - Limiter_ = limiter; -} - -bool TGRpcYdbOverFqService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcYdbOverFqService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - void TGrpcTableOverFqService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); #if defined(ADD_REQUEST) or defined (ADD_REQUEST_IMPL) diff --git a/ydb/services/fq/ydb_over_fq.h b/ydb/services/fq/ydb_over_fq.h index d4f42a4623..67ff7a0c88 100644 --- a/ydb/services/fq/ydb_over_fq.h +++ b/ydb/services/fq/ydb_over_fq.h @@ -18,10 +18,6 @@ public: NActors::TActorId id); void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger); - void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter); - - bool IncRequest(); - void DecRequest(); protected: virtual void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) = 0; @@ -30,7 +26,6 @@ protected: TIntrusivePtr<NMonitoring::TDynamicCounters> Counters_; NActors::TActorId GRpcRequestProxyId_; - NYdbGrpc::TGlobalLimiter* Limiter_ = nullptr; }; class TGrpcTableOverFqService @@ -43,9 +38,6 @@ public: void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override { TBase::InitService(cq, std::move(logger)); } - void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override { - TBase::SetGlobalLimiterHandle(limiter); - } private: void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) override; @@ -61,9 +53,6 @@ public: void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override { TBase::InitService(cq, std::move(logger)); } - void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override { - TBase::SetGlobalLimiterHandle(limiter); - } private: void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) override; diff --git a/ydb/services/keyvalue/grpc_service.cpp b/ydb/services/keyvalue/grpc_service.cpp index 4cd1cc62ea..d6bfd2104f 100644 --- a/ydb/services/keyvalue/grpc_service.cpp +++ b/ydb/services/keyvalue/grpc_service.cpp @@ -22,18 +22,6 @@ void TKeyValueGRpcService::InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc SetupIncomingRequests(std::move(logger)); } -void TKeyValueGRpcService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) { - Limiter = limiter; -} - -bool TKeyValueGRpcService::IncRequest() { - return Limiter->Inc(); -} - -void TKeyValueGRpcService::DecRequest() { - Limiter->Dec(); -} - void TKeyValueGRpcService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { auto getCounterBlock = NGRpcService::CreateCounterCb(Counters, ActorSystem); diff --git a/ydb/services/keyvalue/grpc_service.h b/ydb/services/keyvalue/grpc_service.h index fdf1bf0890..aa255ae6df 100644 --- a/ydb/services/keyvalue/grpc_service.h +++ b/ydb/services/keyvalue/grpc_service.h @@ -17,11 +17,6 @@ public: ~TKeyValueGRpcService(); void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); - private: void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger); @@ -31,7 +26,6 @@ private: NActors::TActorId GRpcRequestProxyId; grpc::ServerCompletionQueue* CQ = nullptr; - NYdbGrpc::TGlobalLimiter* Limiter = nullptr; }; } // namespace NKikimr::NGRpcService diff --git a/ydb/services/local_discovery/grpc_service.cpp b/ydb/services/local_discovery/grpc_service.cpp index 3ea90d8b21..1d91312049 100644 --- a/ydb/services/local_discovery/grpc_service.cpp +++ b/ydb/services/local_discovery/grpc_service.cpp @@ -63,19 +63,6 @@ void TGRpcLocalDiscoveryService::InitService(grpc::ServerCompletionQueue *cq, NY SetupIncomingRequests(std::move(logger)); } -void TGRpcLocalDiscoveryService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter *limiter) { - Limiter_ = limiter; -} - -bool TGRpcLocalDiscoveryService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcLocalDiscoveryService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - void TGRpcLocalDiscoveryService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); using namespace Ydb; diff --git a/ydb/services/local_discovery/grpc_service.h b/ydb/services/local_discovery/grpc_service.h index 57a487d6cc..cd83bf7188 100644 --- a/ydb/services/local_discovery/grpc_service.h +++ b/ydb/services/local_discovery/grpc_service.h @@ -24,10 +24,6 @@ public: NActors::TActorId id); void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); private: void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger); @@ -39,7 +35,6 @@ private: TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; NActors::TActorId GRpcRequestProxyId_; - NYdbGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } // namespace NGRpcService diff --git a/ydb/services/persqueue_cluster_discovery/grpc_service.h b/ydb/services/persqueue_cluster_discovery/grpc_service.h index de54280552..9a2f7d9067 100644 --- a/ydb/services/persqueue_cluster_discovery/grpc_service.h +++ b/ydb/services/persqueue_cluster_discovery/grpc_service.h @@ -13,9 +13,9 @@ public: NActors::TActorId id, const TMaybe<ui64>& requestsInflightLimit = Nothing()); void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override; - bool IncRequest(); - void DecRequest(); + void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override final; + bool IncRequest() override final; + void DecRequest() override final; void StopService() noexcept override; using NYdbGrpc::TGrpcServiceBase<Ydb::PersQueue::V1::ClusterDiscoveryService>::GetService; diff --git a/ydb/services/rate_limiter/grpc_service.cpp b/ydb/services/rate_limiter/grpc_service.cpp index 456f00e948..30b566f9be 100644 --- a/ydb/services/rate_limiter/grpc_service.cpp +++ b/ydb/services/rate_limiter/grpc_service.cpp @@ -20,18 +20,6 @@ void TRateLimiterGRpcService::InitService(grpc::ServerCompletionQueue* cq, NYdbG SetupIncomingRequests(std::move(logger)); } -void TRateLimiterGRpcService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) { - Limiter = limiter; -} - -bool TRateLimiterGRpcService::IncRequest() { - return Limiter->Inc(); -} - -void TRateLimiterGRpcService::DecRequest() { - Limiter->Dec(); -} - void TRateLimiterGRpcService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { auto getCounterBlock = NGRpcService::CreateCounterCb(Counters, ActorSystem); using namespace NGRpcService; diff --git a/ydb/services/rate_limiter/grpc_service.h b/ydb/services/rate_limiter/grpc_service.h index 2464af4de4..2900dca791 100644 --- a/ydb/services/rate_limiter/grpc_service.h +++ b/ydb/services/rate_limiter/grpc_service.h @@ -14,10 +14,6 @@ public: ~TRateLimiterGRpcService(); void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); private: void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger); @@ -28,7 +24,6 @@ private: NActors::TActorId GRpcRequestProxyId; grpc::ServerCompletionQueue* CQ = nullptr; - NYdbGrpc::TGlobalLimiter* Limiter = nullptr; }; } // namespace NKikimr::NQuoter diff --git a/ydb/services/ydb/ydb_dummy.cpp b/ydb/services/ydb/ydb_dummy.cpp index 88943b6f0a..de12795e77 100644 --- a/ydb/services/ydb/ydb_dummy.cpp +++ b/ydb/services/ydb/ydb_dummy.cpp @@ -122,19 +122,6 @@ void TGRpcYdbDummyService::InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc SetupIncomingRequests(std::move(logger)); } -void TGRpcYdbDummyService::SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) { - Limiter_ = limiter; -} - -bool TGRpcYdbDummyService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcYdbDummyService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - void TGRpcYdbDummyService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); diff --git a/ydb/services/ydb/ydb_dummy.h b/ydb/services/ydb/ydb_dummy.h index 3619d24fa7..f398f87ca3 100644 --- a/ydb/services/ydb/ydb_dummy.h +++ b/ydb/services/ydb/ydb_dummy.h @@ -15,10 +15,6 @@ public: TGRpcYdbDummyService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId proxyActorId); void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); private: void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger); @@ -28,7 +24,6 @@ private: TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; NActors::TActorId GRpcRequestProxyId_; - NYdbGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } |