diff options
author | dcherednik <dcherednik@ydb.tech> | 2022-08-31 11:36:00 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2022-08-31 11:36:00 +0300 |
commit | bcd6b2653a988889aa6d990f5f30ca0ecac41a54 (patch) | |
tree | 079d5a654214c9dcb20a3fc905e35237e99eb0b9 | |
parent | db58955e8e90ba21f2d9b82fa9af131f40ff1c6f (diff) | |
download | ydb-bcd6b2653a988889aa6d990f5f30ca0ecac41a54.tar.gz |
Pass RL enable/disable flag in to grpc service.
44 files changed, 369 insertions, 914 deletions
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 6e6686b32b1..25f98caaf3c 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -496,64 +496,85 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { auto fillFn = [&](const NKikimrConfig::TGRpcConfig& grpcConfig, NGrpc::TGRpcServer& server, NGrpc::TServerOptions& opts) { const auto& services = grpcConfig.GetServices(); + const auto& rlServicesEnabled = grpcConfig.GetRatelimiterServicesEnabled(); + const auto& rlServicesDisabled = grpcConfig.GetRatelimiterServicesDisabled(); + + class TServiceCfg { + public: + TServiceCfg(bool enabled) + : ServiceEnabled(enabled) + { } + operator bool() const { + return ServiceEnabled; + } + bool IsRlAllowed() const { + return RlAllowed; + } + void SetRlAllowed(bool allowed) { + RlAllowed = allowed; + } + private: + bool ServiceEnabled = false; + bool RlAllowed = false; + }; - std::unordered_map<TString, bool*> names; + std::unordered_map<TString, TServiceCfg*> names; - bool hasLegacy = opts.SslData.Empty() && services.empty(); + TServiceCfg hasLegacy = opts.SslData.Empty() && services.empty(); names["legacy"] = &hasLegacy; - bool hasScripting = services.empty(); + TServiceCfg hasScripting = services.empty(); names["scripting"] = &hasScripting; - bool hasCms = services.empty(); + TServiceCfg hasCms = services.empty(); names["cms"] = &hasCms; - bool hasKesus = services.empty(); + TServiceCfg hasKesus = services.empty(); names["locking"] = names["kesus"] = &hasKesus; - bool hasMonitoring = services.empty(); + TServiceCfg hasMonitoring = services.empty(); names["monitoring"] = &hasMonitoring; - bool hasDiscovery = services.empty(); + TServiceCfg hasDiscovery = services.empty(); names["discovery"] = &hasDiscovery; - bool hasLocalDiscovery = false; + TServiceCfg hasLocalDiscovery = false; names["local_discovery"] = &hasLocalDiscovery; - bool hasTableService = services.empty(); + TServiceCfg hasTableService = services.empty(); names["table_service"] = &hasTableService; - bool hasSchemeService = false; - bool hasOperationService = false; - bool hasYql = false; + TServiceCfg hasSchemeService = false; + TServiceCfg hasOperationService = false; + TServiceCfg hasYql = false; names["yql"] = &hasYql; - bool hasYqlInternal = services.empty(); + TServiceCfg hasYqlInternal = services.empty(); names["yql_internal"] = &hasYqlInternal; - bool hasPQ = services.empty(); + TServiceCfg hasPQ = services.empty(); names["pq"] = &hasPQ; - bool hasPQv1 = services.empty(); + TServiceCfg hasPQv1 = services.empty(); names["pqv1"] = &hasPQv1; - bool hasTopic = services.empty(); + TServiceCfg hasTopic = services.empty(); names["topic"] = &hasTopic; - bool hasPQCD = services.empty(); + TServiceCfg hasPQCD = services.empty(); names["pqcd"] = &hasPQCD; - bool hasS3Internal = false; + TServiceCfg hasS3Internal = false; names["s3_internal"] = &hasS3Internal; - bool hasExperimental = false; + TServiceCfg hasExperimental = false; names["experimental"] = &hasExperimental; - bool hasClickhouseInternal = services.empty(); + TServiceCfg hasClickhouseInternal = services.empty(); names["clickhouse_internal"] = &hasClickhouseInternal; - bool hasRateLimiter = false; + TServiceCfg hasRateLimiter = false; names["rate_limiter"] = &hasRateLimiter; - bool hasLongTx = false; + TServiceCfg hasLongTx = false; names["long_tx"] = &hasLongTx; - bool hasExport = services.empty(); + TServiceCfg hasExport = services.empty(); names["export"] = &hasExport; - bool hasImport = services.empty(); + TServiceCfg hasImport = services.empty(); names["import"] = &hasImport; - bool hasAnalytics = false; + TServiceCfg hasAnalytics = false; names["analytics"] = &hasAnalytics; - bool hasDataStreams = false; + TServiceCfg hasDataStreams = false; names["datastreams"] = &hasDataStreams; - bool hasYandexQuery = false; + TServiceCfg hasYandexQuery = false; names["yq"] = &hasYandexQuery; - bool hasYandexQueryPrivate = false; + TServiceCfg hasYandexQueryPrivate = false; names["yq_private"] = &hasYandexQueryPrivate; - bool hasLogStore = false; + TServiceCfg hasLogStore = false; names["logstore"] = &hasLogStore; - bool hasAuth = services.empty(); + TServiceCfg hasAuth = services.empty(); names["auth"] = &hasAuth; std::unordered_set<TString> enabled; @@ -609,6 +630,34 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { hasImport = true; } + // Enable RL for all services if enabled list is empty + if (rlServicesEnabled.empty()) { + for (auto& [name, cfg] : names) { + Y_VERIFY(cfg); + cfg->SetRlAllowed(true); + } + } else { + for (const auto& name : rlServicesEnabled) { + auto itName = names.find(name); + if (itName != names.end()) { + Y_VERIFY(itName->second); + itName->second->SetRlAllowed(true); + } else if (!ModuleFactories || !ModuleFactories->GrpcServiceFactory.Has(name)) { + Cerr << "Unknown grpc service \"" << name << "\" rl was not enabled" << Endl; + } + } + } + + for (const auto& name : rlServicesDisabled) { + auto itName = names.find(name); + if (itName != names.end()) { + Y_VERIFY(itName->second); + itName->second->SetRlAllowed(false); + } else if (!ModuleFactories || !ModuleFactories->GrpcServiceFactory.Has(name)) { + Cerr << "Unknown grpc service \"" << name << "\" rl was not disabled" << Endl; + } + } + for (const auto& [name, isEnabled] : names) { if (*isEnabled) { enabled.insert(name); @@ -642,59 +691,70 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { } if (hasTableService) { - server.AddService(new NGRpcService::TGRpcYdbTableService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + server.AddService(new NGRpcService::TGRpcYdbTableService(ActorSystem.Get(), Counters, grpcRequestProxyId, + hasTableService.IsRlAllowed())); } if (hasExperimental) { server.AddService(new NGRpcService::TGRpcYdbExperimentalService(ActorSystem.Get(), Counters, - grpcRequestProxyId)); + grpcRequestProxyId, hasExperimental.IsRlAllowed())); } if (hasClickhouseInternal) { server.AddService(new NGRpcService::TGRpcYdbClickhouseInternalService(ActorSystem.Get(), Counters, - AppData->InFlightLimiterRegistry, grpcRequestProxyId)); + AppData->InFlightLimiterRegistry, grpcRequestProxyId, hasClickhouseInternal.IsRlAllowed())); } if (hasS3Internal) { server.AddService(new NGRpcService::TGRpcYdbS3InternalService(ActorSystem.Get(), Counters, - grpcRequestProxyId)); + grpcRequestProxyId, hasS3Internal.IsRlAllowed())); } if (hasScripting) { server.AddService(new NGRpcService::TGRpcYdbScriptingService(ActorSystem.Get(), Counters, - grpcRequestProxyId)); + grpcRequestProxyId, hasScripting.IsRlAllowed())); } if (hasLongTx) { - server.AddService(new NGRpcService::TGRpcYdbLongTxService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + server.AddService(new NGRpcService::TGRpcYdbLongTxService(ActorSystem.Get(), Counters, + grpcRequestProxyId, hasLongTx.IsRlAllowed())); } if (hasSchemeService) { - server.AddService(new NGRpcService::TGRpcYdbSchemeService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + // RPC RL enabled + // We have no way to disable or enable this service explicitly + server.AddService(new NGRpcService::TGRpcYdbSchemeService(ActorSystem.Get(), Counters, + grpcRequestProxyId, true /*hasSchemeService.IsRlAllowed()*/)); } if (hasOperationService) { - server.AddService(new NGRpcService::TGRpcOperationService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + server.AddService(new NGRpcService::TGRpcOperationService(ActorSystem.Get(), Counters, + grpcRequestProxyId, hasOperationService.IsRlAllowed())); } if (hasExport) { - server.AddService(new NGRpcService::TGRpcYdbExportService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + server.AddService(new NGRpcService::TGRpcYdbExportService(ActorSystem.Get(), Counters, + grpcRequestProxyId, hasExport.IsRlAllowed())); } if (hasImport) { - server.AddService(new NGRpcService::TGRpcYdbImportService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + server.AddService(new NGRpcService::TGRpcYdbImportService(ActorSystem.Get(), Counters, + grpcRequestProxyId, hasImport.IsRlAllowed())); } if (hasKesus) { - server.AddService(new NKesus::TKesusGRpcService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + server.AddService(new NKesus::TKesusGRpcService(ActorSystem.Get(), Counters, + grpcRequestProxyId, hasKesus.IsRlAllowed())); } if (hasPQv1) { - server.AddService(new NGRpcService::V1::TGRpcPersQueueService(ActorSystem.Get(), Counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId)); + server.AddService(new NGRpcService::V1::TGRpcPersQueueService(ActorSystem.Get(), Counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), + grpcRequestProxyId, hasPQv1.IsRlAllowed())); } if (hasPQv1 || hasTopic) { - server.AddService(new NGRpcService::V1::TGRpcTopicService(ActorSystem.Get(), Counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId)); + server.AddService(new NGRpcService::V1::TGRpcTopicService(ActorSystem.Get(), Counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), + grpcRequestProxyId, hasTopic.IsRlAllowed() || hasPQv1.IsRlAllowed())); } if (hasPQCD) { @@ -710,15 +770,18 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { } if (hasCms) { - server.AddService(new NGRpcService::TGRpcCmsService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + server.AddService(new NGRpcService::TGRpcCmsService(ActorSystem.Get(), Counters, + grpcRequestProxyId, hasCms.IsRlAllowed())); } if (hasDiscovery) { - server.AddService(new NGRpcService::TGRpcDiscoveryService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + server.AddService(new NGRpcService::TGRpcDiscoveryService(ActorSystem.Get(), Counters, + grpcRequestProxyId, hasDiscovery.IsRlAllowed())); } if (hasLocalDiscovery) { - server.AddService(new NGRpcService::TGRpcLocalDiscoveryService(grpcConfig, ActorSystem.Get(), Counters, grpcRequestProxyId)); + server.AddService(new NGRpcService::TGRpcLocalDiscoveryService(grpcConfig, ActorSystem.Get(), Counters, + grpcRequestProxyId)); } if (hasRateLimiter) { @@ -726,15 +789,18 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { } if (hasMonitoring) { - server.AddService(new NGRpcService::TGRpcMonitoringService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + server.AddService(new NGRpcService::TGRpcMonitoringService(ActorSystem.Get(), Counters, + grpcRequestProxyId, hasMonitoring.IsRlAllowed())); } if (hasAuth) { - server.AddService(new NGRpcService::TGRpcAuthService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + server.AddService(new NGRpcService::TGRpcAuthService(ActorSystem.Get(), Counters, + grpcRequestProxyId, hasAuth.IsRlAllowed())); } if (hasDataStreams) { - server.AddService(new NGRpcService::TGRpcDataStreamsService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + server.AddService(new NGRpcService::TGRpcDataStreamsService(ActorSystem.Get(), Counters, + grpcRequestProxyId, hasDataStreams.IsRlAllowed())); } if (hasYandexQuery) { @@ -746,7 +812,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { } if (hasLogStore) { - server.AddService(new NGRpcService::TGRpcYdbLogStoreService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + server.AddService(new NGRpcService::TGRpcYdbLogStoreService(ActorSystem.Get(), Counters, + grpcRequestProxyId, hasLogStore.IsRlAllowed())); } if (ModuleFactories) { diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 6f236c993b7..ac0dd35b3e6 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -315,6 +315,9 @@ enum class TRateLimiterMode : ui8 { RuOnProgress = 3, }; +#define RLSWITCH(mode) \ + IsRlAllowed() ? mode : TRateLimiterMode::Off + class ICheckerIface; // The way to pass some common data to request processing diff --git a/ydb/core/grpc_services/base/base_service.h b/ydb/core/grpc_services/base/base_service.h new file mode 100644 index 00000000000..52db0b46df4 --- /dev/null +++ b/ydb/core/grpc_services/base/base_service.h @@ -0,0 +1,66 @@ +#pragma once + +#include <library/cpp/grpc/server/grpc_request_base.h> +#include <library/cpp/grpc/server/logger.h> + +namespace NKikimr { +namespace NGRpcService { + +class TGrpcServiceCfg { +public: + TGrpcServiceCfg(bool rlAllowed) + : RlAllowed_(rlAllowed) + { } + + bool IsRlAllowed() const { + return RlAllowed_; + } +private: + const bool RlAllowed_; +}; + +template <typename T> +class TGrpcServiceBase + : public NGrpc::TGrpcServiceBase<T> + , public TGrpcServiceCfg +{ +public: + TGrpcServiceBase(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id, bool rlAllowed) + : TGrpcServiceCfg(rlAllowed) + , ActorSystem_(system) + , Counters_(counters) + , GRpcRequestProxyId_(id) +{ } + + void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override { + CQ_ = cq; + SetupIncomingRequests(std::move(logger)); + } + + void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override { + Limiter_ = limiter; + } + + bool IncRequest() { + return Limiter_->Inc(); + } + + void DecRequest() { + Limiter_->Dec(); + Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); + } + +protected: + virtual void SetupIncomingRequests(NGrpc::TLoggerPtr logger) = 0; + + NActors::TActorSystem* ActorSystem_; + grpc::ServerCompletionQueue* CQ_ = nullptr; + + TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; + const NActors::TActorId GRpcRequestProxyId_; + + NGrpc::TGlobalLimiter* Limiter_ = nullptr; +}; + +} +} diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 6bc5f12f4e8..89177e53dbe 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -612,6 +612,9 @@ message TGRpcConfig { optional bool SkipSchemeCheck = 24 [default = false]; + repeated string RatelimiterServicesEnabled = 25; + repeated string RatelimiterServicesDisabled = 26; + // server socket options optional bool KeepAliveEnable = 100 [default = true]; // SO_KEEPALIVE optional uint32 KeepAliveIdleTimeoutTriggerSec = 101 [default = 90]; // TCP_KEEPIDLE diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index a34653f9546..fb2d8cf6fd0 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -307,25 +307,25 @@ namespace Tests { future.Subscribe(startCb); GRpcServer->AddService(grpcService); - GRpcServer->AddService(new NGRpcService::TGRpcYdbExportService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::TGRpcYdbImportService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::TGRpcYdbSchemeService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::TGRpcYdbTableService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::TGRpcYdbScriptingService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::TGRpcOperationService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::V1::TGRpcPersQueueService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::V1::TGRpcTopicService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbExportService(system, counters, grpcRequestProxyId, true)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbImportService(system, counters, grpcRequestProxyId, true)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbSchemeService(system, counters, grpcRequestProxyId, true)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbTableService(system, counters, grpcRequestProxyId, true)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbScriptingService(system, counters, grpcRequestProxyId, true)); + GRpcServer->AddService(new NGRpcService::TGRpcOperationService(system, counters, grpcRequestProxyId, true)); + GRpcServer->AddService(new NGRpcService::V1::TGRpcPersQueueService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId, true)); + GRpcServer->AddService(new NGRpcService::V1::TGRpcTopicService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId, true)); GRpcServer->AddService(new NGRpcService::TGRpcPQClusterDiscoveryService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NKesus::TKesusGRpcService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::TGRpcCmsService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::TGRpcDiscoveryService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::TGRpcYdbExperimentalService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::TGRpcYdbClickhouseInternalService(system, counters, appData.InFlightLimiterRegistry, grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::TGRpcYdbS3InternalService(system, counters, grpcRequestProxyId)); + GRpcServer->AddService(new NKesus::TKesusGRpcService(system, counters, grpcRequestProxyId, true)); + GRpcServer->AddService(new NGRpcService::TGRpcCmsService(system, counters, grpcRequestProxyId, true)); + GRpcServer->AddService(new NGRpcService::TGRpcDiscoveryService(system, counters, grpcRequestProxyId, true)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbExperimentalService(system, counters, grpcRequestProxyId, true)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbClickhouseInternalService(system, counters, appData.InFlightLimiterRegistry, grpcRequestProxyId, true)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbS3InternalService(system, counters, grpcRequestProxyId, true)); GRpcServer->AddService(new NQuoter::TRateLimiterGRpcService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::TGRpcYdbLongTxService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::TGRpcDataStreamsService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxyId)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbLongTxService(system, counters, grpcRequestProxyId, true)); + GRpcServer->AddService(new NGRpcService::TGRpcDataStreamsService(system, counters, grpcRequestProxyId, true)); + GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxyId, true)); if (Settings->EnableYq) { GRpcServer->AddService(new NGRpcService::TGRpcYandexQueryService(system, counters, grpcRequestProxyId)); GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxyId)); @@ -338,8 +338,8 @@ namespace Tests { GRpcServer->AddService(service); } } - GRpcServer->AddService(new NGRpcService::TGRpcYdbLogStoreService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::TGRpcAuthService(system, counters, grpcRequestProxyId)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbLogStoreService(system, counters, grpcRequestProxyId, true)); + GRpcServer->AddService(new NGRpcService::TGRpcAuthService(system, counters, grpcRequestProxyId, true)); GRpcServer->Start(); } diff --git a/ydb/services/auth/grpc_service.cpp b/ydb/services/auth/grpc_service.cpp index 996cd501684..8a2eac7acbe 100644 --- a/ydb/services/auth/grpc_service.cpp +++ b/ydb/services/auth/grpc_service.cpp @@ -14,33 +14,6 @@ static TString GetSdkBuildInfo(NGrpc::IRequestContextBase* reqCtx) { return TString{res[0]}; } -TGRpcAuthService::TGRpcAuthService(NActors::TActorSystem* system, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id) - : ActorSystem_(system) - , Counters_(counters) - , GRpcRequestProxyId_(id) -{ -} - -void TGRpcAuthService::InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) { - CQ_ = cq; - SetupIncomingRequests(std::move(logger)); -} - -void TGRpcAuthService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { - Limiter_ = limiter; -} - -bool TGRpcAuthService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcAuthService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - void TGRpcAuthService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); #ifdef ADD_REQUEST diff --git a/ydb/services/auth/grpc_service.h b/ydb/services/auth/grpc_service.h index 61e2e884c96..09fa4b4e875 100644 --- a/ydb/services/auth/grpc_service.h +++ b/ydb/services/auth/grpc_service.h @@ -3,33 +3,19 @@ #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/grpc/server/grpc_server.h> #include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h> +#include <ydb/core/grpc_services/base/base_service.h> namespace NKikimr { namespace NGRpcService { - class TGRpcAuthService : public NGrpc::TGrpcServiceBase<Ydb::Auth::V1::AuthService> + class TGRpcAuthService : public TGrpcServiceBase<Ydb::Auth::V1::AuthService> { public: - TGRpcAuthService(NActors::TActorSystem* system, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id); - - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); + using TGrpcServiceBase<Ydb::Auth::V1::AuthService>::TGrpcServiceBase; private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - - NActors::TActorSystem* ActorSystem_; - grpc::ServerCompletionQueue* CQ_ = nullptr; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; - NActors::TActorId GRpcRequestProxyId_; - NGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } // namespace NGRpcService diff --git a/ydb/services/cms/grpc_service.cpp b/ydb/services/cms/grpc_service.cpp index 5905ab33859..728a578c488 100644 --- a/ydb/services/cms/grpc_service.cpp +++ b/ydb/services/cms/grpc_service.cpp @@ -7,34 +7,6 @@ namespace NKikimr { namespace NGRpcService { -TGRpcCmsService::TGRpcCmsService(NActors::TActorSystem *system, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id) - : ActorSystem_(system) - , Counters_(counters) - , GRpcRequestProxyId_(id) -{ -} - -void TGRpcCmsService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { - CQ_ = cq; - SetupIncomingRequests(std::move(logger)); -} - -void TGRpcCmsService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) -{ - Limiter_ = limiter; -} - -bool TGRpcCmsService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcCmsService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - void TGRpcCmsService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); using namespace Ydb; @@ -49,7 +21,7 @@ void TGRpcCmsService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ ActorSystem_->Send(GRpcRequestProxyId_, \ new TGrpcRequestOperationCall<Cms::NAME##Request, Cms::NAME##Response> \ - (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::Rps, nullptr})); \ + (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); \ }, &Cms::V1::CmsService::AsyncService::Request ## NAME, \ #NAME, logger, getCounterBlock("cms", #NAME))->Run(); diff --git a/ydb/services/cms/grpc_service.h b/ydb/services/cms/grpc_service.h index 8367051cce5..967eed68c95 100644 --- a/ydb/services/cms/grpc_service.h +++ b/ydb/services/cms/grpc_service.h @@ -3,34 +3,18 @@ #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/grpc/server/grpc_server.h> #include <ydb/public/api/grpc/ydb_cms_v1.grpc.pb.h> - +#include <ydb/core/grpc_services/base/base_service.h> namespace NKikimr { namespace NGRpcService { class TGRpcCmsService - : public NGrpc::TGrpcServiceBase<Ydb::Cms::V1::CmsService> + : public TGrpcServiceBase<Ydb::Cms::V1::CmsService> { public: - TGRpcCmsService(NActors::TActorSystem* system, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id); - - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); - - private: + using TGrpcServiceBase<Ydb::Cms::V1::CmsService>::TGrpcServiceBase; +private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - - NActors::TActorSystem* ActorSystem_; - grpc::ServerCompletionQueue* CQ_ = nullptr; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; - NActors::TActorId GRpcRequestProxyId_; - NGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } // namespace NGRpcService diff --git a/ydb/services/datastreams/grpc_service.cpp b/ydb/services/datastreams/grpc_service.cpp index 5337821d746..7df7b15f819 100644 --- a/ydb/services/datastreams/grpc_service.cpp +++ b/ydb/services/datastreams/grpc_service.cpp @@ -37,36 +37,6 @@ void YdsProcessAttr(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData, namespace NKikimr::NGRpcService { -TGRpcDataStreamsService::TGRpcDataStreamsService(NActors::TActorSystem *system, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id) - : ActorSystem_(system) - , Counters_(counters) - , GRpcRequestProxyId_(id) -{ -} - -void TGRpcDataStreamsService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) -{ - CQ_ = cq; - - SetupIncomingRequests(logger); -} - -void TGRpcDataStreamsService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter *limiter) { - Limiter_ = limiter; -} - -bool TGRpcDataStreamsService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcDataStreamsService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - - void TGRpcDataStreamsService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); diff --git a/ydb/services/datastreams/grpc_service.h b/ydb/services/datastreams/grpc_service.h index 0c80b53ee8f..0dbab489933 100644 --- a/ydb/services/datastreams/grpc_service.h +++ b/ydb/services/datastreams/grpc_service.h @@ -3,31 +3,16 @@ #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/grpc/server/grpc_server.h> #include <ydb/public/api/grpc/draft/ydb_datastreams_v1.grpc.pb.h> +#include <ydb/core/grpc_services/base/base_service.h> namespace NKikimr::NGRpcService { - class TGRpcDataStreamsService : public NGrpc::TGrpcServiceBase<Ydb::DataStreams::V1::DataStreamsService> + class TGRpcDataStreamsService : public TGrpcServiceBase<Ydb::DataStreams::V1::DataStreamsService> { public: - TGRpcDataStreamsService(NActors::TActorSystem* system, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id); - - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); - + using TGrpcServiceBase<Ydb::DataStreams::V1::DataStreamsService>::TGrpcServiceBase; private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - - NActors::TActorSystem* ActorSystem_; - grpc::ServerCompletionQueue* CQ_ = nullptr; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; - NActors::TActorId GRpcRequestProxyId_; - NGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } diff --git a/ydb/services/discovery/grpc_service.cpp b/ydb/services/discovery/grpc_service.cpp index 2df0e98629b..2ffebc9ff36 100644 --- a/ydb/services/discovery/grpc_service.cpp +++ b/ydb/services/discovery/grpc_service.cpp @@ -16,34 +16,7 @@ static TString GetSdkBuildInfo(NGrpc::IRequestContextBase* reqCtx) { return TString{res[0]}; } -TGRpcDiscoveryService::TGRpcDiscoveryService(NActors::TActorSystem *system, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id) - : ActorSystem_(system) - , Counters_(counters) - , GRpcRequestProxyId_(id) - { - } - - void TGRpcDiscoveryService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { - CQ_ = cq; - SetupIncomingRequests(std::move(logger)); - } - - void TGRpcDiscoveryService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter *limiter) { - Limiter_ = limiter; - } - - bool TGRpcDiscoveryService::IncRequest() { - return Limiter_->Inc(); - } - - void TGRpcDiscoveryService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); - } - - void TGRpcDiscoveryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { +void TGRpcDiscoveryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); using namespace Ydb; #ifdef ADD_REQUEST @@ -56,7 +29,7 @@ TGRpcDiscoveryService::TGRpcDiscoveryService(NActors::TActorSystem *system, NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer(), GetSdkBuildInfo(ctx)); \ ActorSystem_->Send(GRpcRequestProxyId_, \ new TGrpcRequestOperationCall<Discovery::NAME##Request, Discovery::NAME##Response> \ - (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::Rps, nullptr})); \ + (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); \ }, &Ydb::Discovery::V1::DiscoveryService::AsyncService::Request ## NAME, \ #NAME, logger, getCounterBlock("discovery", #NAME))->Run(); diff --git a/ydb/services/discovery/grpc_service.h b/ydb/services/discovery/grpc_service.h index 4b36484577b..697f6e1c07b 100644 --- a/ydb/services/discovery/grpc_service.h +++ b/ydb/services/discovery/grpc_service.h @@ -6,32 +6,21 @@ #include <library/cpp/grpc/server/grpc_server.h> +#include <ydb/core/grpc_services/base/base_service.h> + + namespace NKikimr { namespace NGRpcService { class TGRpcDiscoveryService - : public NGrpc::TGrpcServiceBase<Ydb::Discovery::V1::DiscoveryService> + : public TGrpcServiceBase<Ydb::Discovery::V1::DiscoveryService> { - public: - TGRpcDiscoveryService(NActors::TActorSystem* system, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id); - - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); + public: + using TGrpcServiceBase<Ydb::Discovery::V1::DiscoveryService>::TGrpcServiceBase; private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - NActors::TActorSystem* ActorSystem_; - grpc::ServerCompletionQueue* CQ_ = nullptr; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; - NActors::TActorId GRpcRequestProxyId_; - NGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } // namespace NGRpcService diff --git a/ydb/services/kesus/grpc_service.cpp b/ydb/services/kesus/grpc_service.cpp index 7d991c50028..2ee2515df81 100644 --- a/ydb/services/kesus/grpc_service.cpp +++ b/ydb/services/kesus/grpc_service.cpp @@ -607,39 +607,9 @@ private: //////////////////////////////////////////////////////////////////////////////// -TKesusGRpcService::TKesusGRpcService( - NActors::TActorSystem* actorSystem, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id) - : ActorSystem(actorSystem) - , Counters(counters) - , GRpcRequestProxyId(id) -{} - -TKesusGRpcService::~TKesusGRpcService() { - // empty -} - -void TKesusGRpcService::InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) { - CQ = cq; - SetupIncomingRequests(std::move(logger)); -} - -void TKesusGRpcService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { - Limiter = limiter; -} - -bool TKesusGRpcService::IncRequest() { - return Limiter->Inc(); -} - -void TKesusGRpcService::DecRequest() { - Limiter->Dec(); - Y_ASSERT(Limiter->GetCurrentInFlight() >= 0); -} - void TKesusGRpcService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { - auto getCounterBlock = NGRpcService::CreateCounterCb(Counters, ActorSystem); + auto getCounterBlock = NGRpcService::CreateCounterCb(Counters_, ActorSystem_); + using NGRpcService::TRateLimiterMode; #ifdef ADD_REQUEST #error ADD_REQUEST macro is already defined @@ -649,12 +619,12 @@ void TKesusGRpcService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { MakeIntrusive<NGRpcService::TGRpcRequest<Ydb::Coordination::IN, Ydb::Coordination::OUT, TKesusGRpcService>>( \ this, \ &Service_, \ - CQ, \ + CQ_, \ [this](NGrpc::IRequestContextBase* reqCtx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem, reqCtx->GetPeer()); \ - ActorSystem->Send(GRpcRequestProxyId, \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, reqCtx->GetPeer()); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ new NGRpcService::TGrpcRequestOperationCall<Ydb::Coordination::IN, Ydb::Coordination::OUT> \ - (reqCtx, &CB, NGRpcService::TRequestAuxSettings{NGRpcService::TRateLimiterMode::Rps, nullptr})); \ + (reqCtx, &CB, NGRpcService::TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); \ }, \ &Ydb::Coordination::V1::CoordinationService::AsyncService::Request ## NAME, \ "Coordination/" #NAME, \ @@ -671,13 +641,13 @@ void TKesusGRpcService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { TGRpcSessionActor::TGRpcRequest::Start( this, this->GetService(), - CQ, + CQ_, &Ydb::Coordination::V1::CoordinationService::AsyncService::RequestSession, [this](TIntrusivePtr<TGRpcSessionActor::IContext> context) { - NGRpcService::ReportGrpcReqToMon(*ActorSystem, context->GetPeerName()); - ActorSystem->Send(GRpcRequestProxyId, new NGRpcService::TEvCoordinationSessionRequest(context)); + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, context->GetPeerName()); + ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvCoordinationSessionRequest(context)); }, - *ActorSystem, + *ActorSystem_, "Coordination/Session", getCounterBlock("coordination", "Session", true, true), /* TODO: limiter */ nullptr); diff --git a/ydb/services/kesus/grpc_service.h b/ydb/services/kesus/grpc_service.h index 6317fdf1dad..ab7ae0645eb 100644 --- a/ydb/services/kesus/grpc_service.h +++ b/ydb/services/kesus/grpc_service.h @@ -8,38 +8,23 @@ #include <util/generic/hash_set.h> +#include <ydb/core/grpc_services/base/base_service.h> + + namespace NKikimr { namespace NKesus { class TKesusGRpcService - : public NGrpc::TGrpcServiceBase<Ydb::Coordination::V1::CoordinationService> + : public ::NKikimr::NGRpcService::TGrpcServiceBase<Ydb::Coordination::V1::CoordinationService> { class TContextBase; class TSessionContext; public: - TKesusGRpcService( - NActors::TActorSystem* actorSystem, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id); - ~TKesusGRpcService(); - - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); + using ::NKikimr::NGRpcService::TGrpcServiceBase<Ydb::Coordination::V1::CoordinationService>::TGrpcServiceBase; private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - -private: - NActors::TActorSystem* ActorSystem; - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; - NActors::TActorId GRpcRequestProxyId; - - grpc::ServerCompletionQueue* CQ = nullptr; - NGrpc::TGlobalLimiter* Limiter = nullptr; }; } diff --git a/ydb/services/local_discovery/grpc_service.h b/ydb/services/local_discovery/grpc_service.h index 02b5232174a..f58e81811c5 100644 --- a/ydb/services/local_discovery/grpc_service.h +++ b/ydb/services/local_discovery/grpc_service.h @@ -6,6 +6,7 @@ #include <ydb/public/api/grpc/ydb_discovery_v1.grpc.pb.h> #include <library/cpp/grpc/server/grpc_server.h> +#include <ydb/core/grpc_services/base/base_service.h> namespace NKikimr { namespace NGRpcService { diff --git a/ydb/services/monitoring/grpc_service.cpp b/ydb/services/monitoring/grpc_service.cpp index 6106b26087a..cd25b10ae3b 100644 --- a/ydb/services/monitoring/grpc_service.cpp +++ b/ydb/services/monitoring/grpc_service.cpp @@ -17,33 +17,6 @@ static TString GetSdkBuildInfo(NGrpc::IRequestContextBase* reqCtx) { return TString{res[0]}; } -TGRpcMonitoringService::TGRpcMonitoringService(NActors::TActorSystem* system, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id) - : ActorSystem_(system) - , Counters_(counters) - , GRpcRequestProxyId_(id) -{ -} - -void TGRpcMonitoringService::InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) { - CQ_ = cq; - SetupIncomingRequests(std::move(logger)); -} - -void TGRpcMonitoringService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { - Limiter_ = limiter; -} - -bool TGRpcMonitoringService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcMonitoringService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - void TGRpcMonitoringService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); using namespace Ydb; diff --git a/ydb/services/monitoring/grpc_service.h b/ydb/services/monitoring/grpc_service.h index f9d2c7e6a52..93dfce7a1bb 100644 --- a/ydb/services/monitoring/grpc_service.h +++ b/ydb/services/monitoring/grpc_service.h @@ -5,32 +5,17 @@ #include <ydb/public/api/grpc/ydb_monitoring_v1.grpc.pb.h> #include <library/cpp/grpc/server/grpc_server.h> +#include <ydb/core/grpc_services/base/base_service.h> namespace NKikimr { namespace NGRpcService { - class TGRpcMonitoringService : public NGrpc::TGrpcServiceBase<Ydb::Monitoring::V1::MonitoringService> + class TGRpcMonitoringService : public TGrpcServiceBase<Ydb::Monitoring::V1::MonitoringService> { public: - TGRpcMonitoringService(NActors::TActorSystem* system, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id); - - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); - - private: + using TGrpcServiceBase<Ydb::Monitoring::V1::MonitoringService>::TGrpcServiceBase; + private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - - NActors::TActorSystem* ActorSystem_; - grpc::ServerCompletionQueue* CQ_ = nullptr; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; - NActors::TActorId GRpcRequestProxyId_; - NGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } // namespace NGRpcService diff --git a/ydb/services/persqueue_v1/persqueue.cpp b/ydb/services/persqueue_v1/persqueue.cpp index 74ea2f42d4f..9a319d9d785 100644 --- a/ydb/services/persqueue_v1/persqueue.cpp +++ b/ydb/services/persqueue_v1/persqueue.cpp @@ -17,58 +17,44 @@ namespace V1 { static const ui32 PersQueueWriteSessionsMaxCount = 1000000; static const ui32 PersQueueReadSessionsMaxCount = 100000; -TGRpcPersQueueService::TGRpcPersQueueService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache,const NActors::TActorId& grpcRequestProxy) - : ActorSystem(system) - , Counters(counters) +TGRpcPersQueueService::TGRpcPersQueueService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache,const NActors::TActorId& grpcRequestProxy, bool rlAllowed) + : TGrpcServiceBase<Ydb::PersQueue::V1::PersQueueService>(system, counters, grpcRequestProxy, rlAllowed) , SchemeCache(schemeCache) - , GRpcRequestProxy(grpcRequestProxy) { } void TGRpcPersQueueService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { - CQ = cq; + CQ_ = cq; InitNewSchemeCacheActor(); - if (ActorSystem->AppData<TAppData>()->PQConfig.GetEnabled()) { + if (ActorSystem_->AppData<TAppData>()->PQConfig.GetEnabled()) { - IActor* writeSvc = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, Counters, PersQueueWriteSessionsMaxCount); - TActorId actorId = ActorSystem->Register(writeSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId); - ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQWriteServiceActorID(), actorId); + IActor* writeSvc = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, Counters_, PersQueueWriteSessionsMaxCount); + TActorId actorId = ActorSystem_->Register(writeSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId); + ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQWriteServiceActorID(), actorId); - IActor* readSvc = NGRpcProxy::V1::CreatePQReadService(SchemeCache, NewSchemeCache, Counters, PersQueueReadSessionsMaxCount); - actorId = ActorSystem->Register(readSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId); - ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQReadServiceActorID(), actorId); + IActor* readSvc = NGRpcProxy::V1::CreatePQReadService(SchemeCache, NewSchemeCache, Counters_, PersQueueReadSessionsMaxCount); + actorId = ActorSystem_->Register(readSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId); + ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQReadServiceActorID(), actorId); - IActor* schemaSvc = NGRpcProxy::V1::CreatePQSchemaService(SchemeCache, Counters); - actorId = ActorSystem->Register(schemaSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId); - ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQSchemaServiceActorID(), actorId); + IActor* schemaSvc = NGRpcProxy::V1::CreatePQSchemaService(SchemeCache, Counters_); + actorId = ActorSystem_->Register(schemaSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId); + ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQSchemaServiceActorID(), actorId); SetupIncomingRequests(std::move(logger)); } } -void TGRpcPersQueueService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { - Limiter = limiter; -} - -bool TGRpcPersQueueService::IncRequest() { - return Limiter->Inc(); -} - -void TGRpcPersQueueService::DecRequest() { - Limiter->Dec(); -} - void TGRpcPersQueueService::InitNewSchemeCacheActor() { - auto appData = ActorSystem->AppData<TAppData>(); - auto cacheCounters = GetServiceCounters(Counters, "pqproxy|schemecache"); + auto appData = ActorSystem_->AppData<TAppData>(); + auto cacheCounters = GetServiceCounters(Counters_, "pqproxy|schemecache"); auto cacheConfig = MakeIntrusive<NSchemeCache::TSchemeCacheConfig>(appData, cacheCounters); - NewSchemeCache = ActorSystem->Register(CreateSchemeBoardSchemeCache(cacheConfig.Get()), - TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId); + NewSchemeCache = ActorSystem_->Register(CreateSchemeBoardSchemeCache(cacheConfig.Get()), + TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId); } void TGRpcPersQueueService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { - auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters, ActorSystem); + auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters_, ActorSystem_); { using TBiRequest = Ydb::PersQueue::V1::StreamingWriteClientMessage; @@ -82,11 +68,11 @@ void TGRpcPersQueueService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { NKikimrServices::GRPC_SERVER>; - TStreamGRpcRequest::Start(this, this->GetService(), CQ, &Ydb::PersQueue::V1::PersQueueService::AsyncService::RequestStreamingWrite, + TStreamGRpcRequest::Start(this, this->GetService(), CQ_, &Ydb::PersQueue::V1::PersQueueService::AsyncService::RequestStreamingWrite, [this](TIntrusivePtr<TStreamGRpcRequest::IContext> context) { - ActorSystem->Send(GRpcRequestProxy, new NKikimr::NGRpcService::TEvStreamPQWriteRequest(context)); + ActorSystem_->Send(GRpcRequestProxyId_, new NKikimr::NGRpcService::TEvStreamPQWriteRequest(context)); }, - *ActorSystem, "PersQueueService/CreateWriteSession", getCounterBlock("persistent_queue", "WriteSession", true, true), nullptr + *ActorSystem_, "PersQueueService/CreateWriteSession", getCounterBlock("persistent_queue", "WriteSession", true, true), nullptr ); } @@ -102,11 +88,11 @@ void TGRpcPersQueueService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { NKikimrServices::GRPC_SERVER>; - TStreamGRpcRequest::Start(this, this->GetService(), CQ, &Ydb::PersQueue::V1::PersQueueService::AsyncService::RequestMigrationStreamingRead, + TStreamGRpcRequest::Start(this, this->GetService(), CQ_, &Ydb::PersQueue::V1::PersQueueService::AsyncService::RequestMigrationStreamingRead, [this](TIntrusivePtr<TStreamGRpcRequest::IContext> context) { - ActorSystem->Send(GRpcRequestProxy, new NKikimr::NGRpcService::TEvStreamPQMigrationReadRequest(context)); + ActorSystem_->Send(GRpcRequestProxyId_, new NKikimr::NGRpcService::TEvStreamPQMigrationReadRequest(context)); }, - *ActorSystem, "PersQueueService/CreateMigrationReadSession", getCounterBlock("persistent_queue", "MigrationReadSession", true, true), nullptr + *ActorSystem_, "PersQueueService/CreateMigrationReadSession", getCounterBlock("persistent_queue", "MigrationReadSession", true, true), nullptr ); } @@ -114,35 +100,35 @@ void TGRpcPersQueueService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { #error ADD_REQUEST macro already defined #endif #define ADD_REQUEST(NAME, SVC, IN, OUT, ACTION) \ - MakeIntrusive<TGRpcRequest<Ydb::PersQueue::V1::IN, Ydb::PersQueue::V1::OUT, NGRpcService::V1::TGRpcPersQueueService>>(this, this->GetService(), CQ, \ + MakeIntrusive<TGRpcRequest<Ydb::PersQueue::V1::IN, Ydb::PersQueue::V1::OUT, NGRpcService::V1::TGRpcPersQueueService>>(this, this->GetService(), CQ_, \ [this](NGrpc::IRequestContextBase *ctx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem, ctx->GetPeer()); \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ ACTION; \ }, &Ydb::PersQueue::V1::SVC::AsyncService::Request ## NAME, \ "PersQueueService/"#NAME, logger, getCounterBlock("persistent_queue", #NAME))->Run(); ADD_REQUEST(GetReadSessionsInfo, PersQueueService, ReadInfoRequest, ReadInfoResponse, { - ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQReadInfoRequest(ctx)); + ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQReadInfoRequest(ctx)); }) ADD_REQUEST(DropTopic, PersQueueService, DropTopicRequest, DropTopicResponse, { - ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQDropTopicRequest(ctx)); + ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQDropTopicRequest(ctx)); }) ADD_REQUEST(CreateTopic, PersQueueService, CreateTopicRequest, CreateTopicResponse, { - ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQCreateTopicRequest(ctx)); + ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQCreateTopicRequest(ctx)); }) ADD_REQUEST(AlterTopic, PersQueueService, AlterTopicRequest, AlterTopicResponse, { - ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQAlterTopicRequest(ctx)); + ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQAlterTopicRequest(ctx)); }) ADD_REQUEST(DescribeTopic, PersQueueService, DescribeTopicRequest, DescribeTopicResponse, { - ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQDescribeTopicRequest(ctx)); + ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQDescribeTopicRequest(ctx)); }) ADD_REQUEST(AddReadRule, PersQueueService, AddReadRuleRequest, AddReadRuleResponse, { - ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQAddReadRuleRequest(ctx)); + ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQAddReadRuleRequest(ctx)); }) ADD_REQUEST(RemoveReadRule, PersQueueService, RemoveReadRuleRequest, RemoveReadRuleResponse, { - ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQRemoveReadRuleRequest(ctx)); + ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQRemoveReadRuleRequest(ctx)); }) #undef ADD_REQUEST diff --git a/ydb/services/persqueue_v1/persqueue.h b/ydb/services/persqueue_v1/persqueue.h index 7646d3a6093..cabcaa18f0f 100644 --- a/ydb/services/persqueue_v1/persqueue.h +++ b/ydb/services/persqueue_v1/persqueue.h @@ -5,7 +5,7 @@ #include <ydb/public/api/grpc/draft/ydb_persqueue_v1.grpc.pb.h> #include <library/cpp/grpc/server/grpc_server.h> - +#include <ydb/core/grpc_services/base/base_service.h> namespace NKikimr { @@ -13,33 +13,24 @@ namespace NGRpcService { namespace V1 { class TGRpcPersQueueService - : public NGrpc::TGrpcServiceBase<Ydb::PersQueue::V1::PersQueueService> + : public TGrpcServiceBase<Ydb::PersQueue::V1::PersQueueService> { public: - TGRpcPersQueueService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache, const NActors::TActorId& grpcRequestProxy); + TGRpcPersQueueService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache, const NActors::TActorId& grpcRequestProxy, bool rlAllowed); void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; void StopService() noexcept override; using NGrpc::TGrpcServiceBase<Ydb::PersQueue::V1::PersQueueService>::GetService; - bool IncRequest(); - void DecRequest(); private: - void SetupIncomingRequests(NGrpc::TLoggerPtr logger); + void SetupIncomingRequests(NGrpc::TLoggerPtr logger) override; void InitNewSchemeCacheActor(); - NActors::TActorSystem* ActorSystem; - grpc::ServerCompletionQueue* CQ = nullptr; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; - NGrpc::TGlobalLimiter* Limiter = nullptr; NActors::TActorId SchemeCache; NActors::TActorId NewSchemeCache; - NActors::TActorId GRpcRequestProxy; }; } // namespace V1 diff --git a/ydb/services/persqueue_v1/topic.cpp b/ydb/services/persqueue_v1/topic.cpp index 9d01f2686a6..8dd541760e5 100644 --- a/ydb/services/persqueue_v1/topic.cpp +++ b/ydb/services/persqueue_v1/topic.cpp @@ -17,58 +17,44 @@ namespace V1 { static const ui32 TopicWriteSessionsMaxCount = 1000000; static const ui32 TopicReadSessionsMaxCount = 100000; -TGRpcTopicService::TGRpcTopicService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache,const NActors::TActorId& grpcRequestProxy) - : ActorSystem(system) - , Counters(counters) +TGRpcTopicService::TGRpcTopicService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache,const NActors::TActorId& grpcRequestProxy, bool rlAllowed) + : TGrpcServiceBase<Ydb::Topic::V1::TopicService>(system, counters, grpcRequestProxy, rlAllowed) , SchemeCache(schemeCache) - , GRpcRequestProxy(grpcRequestProxy) { } void TGRpcTopicService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { - CQ = cq; + CQ_ = cq; InitNewSchemeCacheActor(); - if (ActorSystem->AppData<TAppData>()->PQConfig.GetEnabled()) { + if (ActorSystem_->AppData<TAppData>()->PQConfig.GetEnabled()) { - IActor* writeSvc = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, Counters, TopicWriteSessionsMaxCount); - TActorId actorId = ActorSystem->Register(writeSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId); - ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQWriteServiceActorID(), actorId); + IActor* writeSvc = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, Counters_, TopicWriteSessionsMaxCount); + TActorId actorId = ActorSystem_->Register(writeSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId); + ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQWriteServiceActorID(), actorId); - IActor* readSvc = NGRpcProxy::V1::CreatePQReadService(SchemeCache, NewSchemeCache, Counters, TopicReadSessionsMaxCount); - actorId = ActorSystem->Register(readSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId); - ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQReadServiceActorID(), actorId); + IActor* readSvc = NGRpcProxy::V1::CreatePQReadService(SchemeCache, NewSchemeCache, Counters_, TopicReadSessionsMaxCount); + actorId = ActorSystem_->Register(readSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId); + ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQReadServiceActorID(), actorId); - IActor* schemaSvc = NGRpcProxy::V1::CreatePQSchemaService(SchemeCache, Counters); - actorId = ActorSystem->Register(schemaSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId); - ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQSchemaServiceActorID(), actorId); + IActor* schemaSvc = NGRpcProxy::V1::CreatePQSchemaService(SchemeCache, Counters_); + actorId = ActorSystem_->Register(schemaSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId); + ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQSchemaServiceActorID(), actorId); SetupIncomingRequests(std::move(logger)); } } -void TGRpcTopicService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { - Limiter = limiter; -} - -bool TGRpcTopicService::IncRequest() { - return Limiter->Inc(); -} - -void TGRpcTopicService::DecRequest() { - Limiter->Dec(); -} - void TGRpcTopicService::InitNewSchemeCacheActor() { - auto appData = ActorSystem->AppData<TAppData>(); - auto cacheCounters = GetServiceCounters(Counters, "pqproxy|schemecache"); + auto appData = ActorSystem_->AppData<TAppData>(); + auto cacheCounters = GetServiceCounters(Counters_, "pqproxy|schemecache"); auto cacheConfig = MakeIntrusive<NSchemeCache::TSchemeCacheConfig>(appData, cacheCounters); - NewSchemeCache = ActorSystem->Register(CreateSchemeBoardSchemeCache(cacheConfig.Get()), - TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId); + NewSchemeCache = ActorSystem_->Register(CreateSchemeBoardSchemeCache(cacheConfig.Get()), + TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId); } void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { - auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters, ActorSystem); + auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters_, ActorSystem_); { using TBiRequest = Ydb::Topic::StreamWriteMessage::FromClient; @@ -82,11 +68,11 @@ void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { NKikimrServices::GRPC_SERVER>; - TStreamGRpcRequest::Start(this, this->GetService(), CQ, &Ydb::Topic::V1::TopicService::AsyncService::RequestStreamWrite, + TStreamGRpcRequest::Start(this, this->GetService(), CQ_, &Ydb::Topic::V1::TopicService::AsyncService::RequestStreamWrite, [this](TIntrusivePtr<TStreamGRpcRequest::IContext> context) { - ActorSystem->Send(GRpcRequestProxy, new NKikimr::NGRpcService::TEvStreamTopicWriteRequest(context)); + ActorSystem_->Send(GRpcRequestProxyId_, new NKikimr::NGRpcService::TEvStreamTopicWriteRequest(context)); }, - *ActorSystem, "TopicService/StreamWrite", getCounterBlock("topic", "StreamWrite", true, true), nullptr + *ActorSystem_, "TopicService/StreamWrite", getCounterBlock("topic", "StreamWrite", true, true), nullptr ); } @@ -102,11 +88,11 @@ void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { NKikimrServices::GRPC_SERVER>; - TStreamGRpcRequest::Start(this, this->GetService(), CQ, &Ydb::Topic::V1::TopicService::AsyncService::RequestStreamRead, + TStreamGRpcRequest::Start(this, this->GetService(), CQ_, &Ydb::Topic::V1::TopicService::AsyncService::RequestStreamRead, [this](TIntrusivePtr<TStreamGRpcRequest::IContext> context) { - ActorSystem->Send(GRpcRequestProxy, new NKikimr::NGRpcService::TEvStreamTopicReadRequest(context)); + ActorSystem_->Send(GRpcRequestProxyId_, new NKikimr::NGRpcService::TEvStreamTopicReadRequest(context)); }, - *ActorSystem, "TopicService/StreamRead", getCounterBlock("topic", "StreamRead", true, true), nullptr + *ActorSystem_, "TopicService/StreamRead", getCounterBlock("topic", "StreamRead", true, true), nullptr ); } @@ -114,24 +100,24 @@ void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { #error ADD_REQUEST macro already defined #endif #define ADD_REQUEST(NAME, SVC, IN, OUT, ACTION) \ - MakeIntrusive<TGRpcRequest<Ydb::Topic::IN, Ydb::Topic::OUT, NGRpcService::V1::TGRpcTopicService>>(this, this->GetService(), CQ, \ + MakeIntrusive<TGRpcRequest<Ydb::Topic::IN, Ydb::Topic::OUT, NGRpcService::V1::TGRpcTopicService>>(this, this->GetService(), CQ_, \ [this](NGrpc::IRequestContextBase *ctx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem, ctx->GetPeer()); \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ ACTION; \ }, &Ydb::Topic::V1::SVC::AsyncService::Request ## NAME, \ "TopicService/"#NAME, logger, getCounterBlock("topic", #NAME))->Run(); ADD_REQUEST(DropTopic, TopicService, DropTopicRequest, DropTopicResponse, { - ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvDropTopicRequest(ctx)); + ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDropTopicRequest(ctx)); }) ADD_REQUEST(CreateTopic, TopicService, CreateTopicRequest, CreateTopicResponse, { - ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvCreateTopicRequest(ctx)); + ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvCreateTopicRequest(ctx)); }) ADD_REQUEST(AlterTopic, TopicService, AlterTopicRequest, AlterTopicResponse, { - ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvAlterTopicRequest(ctx)); + ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvAlterTopicRequest(ctx)); }) ADD_REQUEST(DescribeTopic, TopicService, DescribeTopicRequest, DescribeTopicResponse, { - ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvDescribeTopicRequest(ctx)); + ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDescribeTopicRequest(ctx)); }) #undef ADD_REQUEST diff --git a/ydb/services/persqueue_v1/topic.h b/ydb/services/persqueue_v1/topic.h index 9539d63b32f..a75f1b83754 100644 --- a/ydb/services/persqueue_v1/topic.h +++ b/ydb/services/persqueue_v1/topic.h @@ -6,6 +6,8 @@ #include <library/cpp/grpc/server/grpc_server.h> +#include <ydb/core/grpc_services/base/base_service.h> + namespace NKikimr { @@ -13,33 +15,24 @@ namespace NGRpcService { namespace V1 { class TGRpcTopicService - : public NGrpc::TGrpcServiceBase<Ydb::Topic::V1::TopicService> + : public TGrpcServiceBase<Ydb::Topic::V1::TopicService> { public: - TGRpcTopicService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache, const NActors::TActorId& grpcRequestProxy); + TGRpcTopicService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache, const NActors::TActorId& grpcRequestProxy, bool rlAllowed); void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; void StopService() noexcept override; using NGrpc::TGrpcServiceBase<Ydb::Topic::V1::TopicService>::GetService; - bool IncRequest(); - void DecRequest(); private: - void SetupIncomingRequests(NGrpc::TLoggerPtr logger); + void SetupIncomingRequests(NGrpc::TLoggerPtr logger) override; void InitNewSchemeCacheActor(); - NActors::TActorSystem* ActorSystem; - grpc::ServerCompletionQueue* CQ = nullptr; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; - NGrpc::TGlobalLimiter* Limiter = nullptr; NActors::TActorId SchemeCache; NActors::TActorId NewSchemeCache; - NActors::TActorId GRpcRequestProxy; }; } // namespace V1 diff --git a/ydb/services/ydb/ydb_clickhouse_internal.cpp b/ydb/services/ydb/ydb_clickhouse_internal.cpp index a2f0208cf72..30f3615ca09 100644 --- a/ydb/services/ydb/ydb_clickhouse_internal.cpp +++ b/ydb/services/ydb/ydb_clickhouse_internal.cpp @@ -10,29 +10,11 @@ namespace NGRpcService { TGRpcYdbClickhouseInternalService::TGRpcYdbClickhouseInternalService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, TIntrusivePtr<TInFlightLimiterRegistry> inFlightLimiterRegistry, - NActors::TActorId id) - : ActorSystem_(system) - , Counters_(counters) + NActors::TActorId id, + bool rlAllowed) + : TGrpcServiceBase<Ydb::ClickhouseInternal::V1::ClickhouseInternalService>(system, counters, id, rlAllowed) , LimiterRegistry_(inFlightLimiterRegistry) - , GRpcRequestProxyId_(id) {} - -void TGRpcYdbClickhouseInternalService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { - CQ_ = cq; - SetupIncomingRequests(std::move(logger)); -} - -void TGRpcYdbClickhouseInternalService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { - Limiter_ = limiter; -} - -bool TGRpcYdbClickhouseInternalService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcYdbClickhouseInternalService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} +{} void TGRpcYdbClickhouseInternalService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); @@ -47,7 +29,7 @@ void TGRpcYdbClickhouseInternalService::SetupIncomingRequests(NGrpc::TLoggerPtr NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ ActorSystem_->Send(GRpcRequestProxyId_, \ new NGRpcService::TGrpcRequestOperationCall<Ydb::ClickhouseInternal::IN, Ydb::ClickhouseInternal::OUT> \ - (ctx, &CB, NGRpcService::TRequestAuxSettings{NGRpcService::TRateLimiterMode::Rps, nullptr})); \ + (ctx, &CB, NGRpcService::TRequestAuxSettings{RLSWITCH(NGRpcService::TRateLimiterMode::Rps), nullptr})); \ }, &Ydb::ClickhouseInternal::V1::ClickhouseInternalService::AsyncService::Request ## NAME, \ #NAME, logger, getCounterBlock("clickhouse_internal", #NAME), getLimiter("ClickhouseInternal", #NAME, DEFAULT_MAX_IN_FLIGHT))->Run(); diff --git a/ydb/services/ydb/ydb_clickhouse_internal.h b/ydb/services/ydb/ydb_clickhouse_internal.h index 87c889d1823..d57b776bd62 100644 --- a/ydb/services/ydb/ydb_clickhouse_internal.h +++ b/ydb/services/ydb/ydb_clickhouse_internal.h @@ -7,34 +7,26 @@ #include <library/cpp/actors/core/actorsystem.h> +#include <ydb/core/grpc_services/base/base_service.h> + + namespace NKikimr { namespace NGRpcService { class TGRpcYdbClickhouseInternalService - : public NGrpc::TGrpcServiceBase<Ydb::ClickhouseInternal::V1::ClickhouseInternalService> + : public TGrpcServiceBase<Ydb::ClickhouseInternal::V1::ClickhouseInternalService> { private: constexpr static i64 DEFAULT_MAX_IN_FLIGHT = 200; public: TGRpcYdbClickhouseInternalService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - TIntrusivePtr<NGRpcService::TInFlightLimiterRegistry> inFlightLimiterRegistry, NActors::TActorId id); + TIntrusivePtr<NGRpcService::TInFlightLimiterRegistry> inFlightLimiterRegistry, NActors::TActorId id, bool rlAllowed); - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - NActors::TActorSystem* ActorSystem_; - grpc::ServerCompletionQueue* CQ_ = nullptr; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; TIntrusivePtr<NGRpcService::TInFlightLimiterRegistry> LimiterRegistry_; - NActors::TActorId GRpcRequestProxyId_; - NGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } // namespace NGRpcService diff --git a/ydb/services/ydb/ydb_experimental.cpp b/ydb/services/ydb/ydb_experimental.cpp index 3b40907b189..c5904aef474 100644 --- a/ydb/services/ydb/ydb_experimental.cpp +++ b/ydb/services/ydb/ydb_experimental.cpp @@ -7,30 +7,6 @@ namespace NKikimr { namespace NGRpcService { -TGRpcYdbExperimentalService::TGRpcYdbExperimentalService(NActors::TActorSystem *system, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id) - : ActorSystem_(system) - , Counters_(counters) - , GRpcRequestProxyId_(id) {} - -void TGRpcYdbExperimentalService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { - CQ_ = cq; - SetupIncomingRequests(std::move(logger)); -} - -void TGRpcYdbExperimentalService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { - Limiter_ = limiter; -} - -bool TGRpcYdbExperimentalService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcYdbExperimentalService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - void TGRpcYdbExperimentalService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); diff --git a/ydb/services/ydb/ydb_experimental.h b/ydb/services/ydb/ydb_experimental.h index 206e0f9c85f..e13f11a7906 100644 --- a/ydb/services/ydb/ydb_experimental.h +++ b/ydb/services/ydb/ydb_experimental.h @@ -5,31 +5,19 @@ #include <library/cpp/grpc/server/grpc_server.h> #include <ydb/public/api/grpc/draft/ydb_experimental_v1.grpc.pb.h> +#include <ydb/core/grpc_services/base/base_service.h> namespace NKikimr { namespace NGRpcService { class TGRpcYdbExperimentalService - : public NGrpc::TGrpcServiceBase<Ydb::Experimental::V1::ExperimentalService> + : public TGrpcServiceBase<Ydb::Experimental::V1::ExperimentalService> { public: - TGRpcYdbExperimentalService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id); + using TGrpcServiceBase<Ydb::Experimental::V1::ExperimentalService>::TGrpcServiceBase; - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - - NActors::TActorSystem* ActorSystem_; - grpc::ServerCompletionQueue* CQ_ = nullptr; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; - NActors::TActorId GRpcRequestProxyId_; - NGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } // namespace NGRpcService diff --git a/ydb/services/ydb/ydb_export.cpp b/ydb/services/ydb/ydb_export.cpp index cd56a41885d..5b279782db2 100644 --- a/ydb/services/ydb/ydb_export.cpp +++ b/ydb/services/ydb/ydb_export.cpp @@ -7,30 +7,6 @@ namespace NKikimr { namespace NGRpcService { -TGRpcYdbExportService::TGRpcYdbExportService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id) - : ActorSystem_(system) - , Counters_(counters) - , GRpcRequestProxyId_(id) -{ } - -void TGRpcYdbExportService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { - CQ_ = cq; - SetupIncomingRequests(std::move(logger)); -} - -void TGRpcYdbExportService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { - Limiter_ = limiter; -} - -bool TGRpcYdbExportService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcYdbExportService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - void TGRpcYdbExportService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); diff --git a/ydb/services/ydb/ydb_export.h b/ydb/services/ydb/ydb_export.h index 1e16f4d8258..44258ec94d8 100644 --- a/ydb/services/ydb/ydb_export.h +++ b/ydb/services/ydb/ydb_export.h @@ -3,30 +3,19 @@ #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/grpc/server/grpc_server.h> #include <ydb/public/api/grpc/ydb_export_v1.grpc.pb.h> +#include <ydb/core/grpc_services/base/base_service.h> namespace NKikimr { namespace NGRpcService { class TGRpcYdbExportService - : public NGrpc::TGrpcServiceBase<Ydb::Export::V1::ExportService> + : public TGrpcServiceBase<Ydb::Export::V1::ExportService> { public: - TGRpcYdbExportService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id); - - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); + using TGrpcServiceBase<Ydb::Export::V1::ExportService>::TGrpcServiceBase; private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - NActors::TActorSystem* ActorSystem_; - grpc::ServerCompletionQueue* CQ_ = nullptr; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; - NActors::TActorId GRpcRequestProxyId_; - NGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } // namespace NGRpcService diff --git a/ydb/services/ydb/ydb_import.cpp b/ydb/services/ydb/ydb_import.cpp index 029fcbba4a4..300747d7cc0 100644 --- a/ydb/services/ydb/ydb_import.cpp +++ b/ydb/services/ydb/ydb_import.cpp @@ -7,30 +7,6 @@ namespace NKikimr { namespace NGRpcService { -TGRpcYdbImportService::TGRpcYdbImportService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id) - : ActorSystem_(system) - , Counters_(counters) - , GRpcRequestProxyId_(id) -{ } - -void TGRpcYdbImportService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { - CQ_ = cq; - SetupIncomingRequests(std::move(logger)); -} - -void TGRpcYdbImportService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { - Limiter_ = limiter; -} - -bool TGRpcYdbImportService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcYdbImportService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - void TGRpcYdbImportService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); diff --git a/ydb/services/ydb/ydb_import.h b/ydb/services/ydb/ydb_import.h index 13e74d59036..bc05afffcea 100644 --- a/ydb/services/ydb/ydb_import.h +++ b/ydb/services/ydb/ydb_import.h @@ -3,30 +3,20 @@ #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/grpc/server/grpc_server.h> #include <ydb/public/api/grpc/ydb_import_v1.grpc.pb.h> +#include <ydb/core/grpc_services/base/base_service.h> + namespace NKikimr { namespace NGRpcService { class TGRpcYdbImportService - : public NGrpc::TGrpcServiceBase<Ydb::Import::V1::ImportService> + : public TGrpcServiceBase<Ydb::Import::V1::ImportService> { public: - TGRpcYdbImportService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id); - - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; + using TGrpcServiceBase<Ydb::Import::V1::ImportService>::TGrpcServiceBase; - bool IncRequest(); - void DecRequest(); private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - - NActors::TActorSystem* ActorSystem_; - grpc::ServerCompletionQueue* CQ_ = nullptr; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; - NActors::TActorId GRpcRequestProxyId_; - NGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } // namespace NGRpcService diff --git a/ydb/services/ydb/ydb_logstore.cpp b/ydb/services/ydb/ydb_logstore.cpp index 0e151a9ac46..9b07a408fc3 100644 --- a/ydb/services/ydb/ydb_logstore.cpp +++ b/ydb/services/ydb/ydb_logstore.cpp @@ -7,31 +7,6 @@ namespace NKikimr::NGRpcService { -TGRpcYdbLogStoreService::TGRpcYdbLogStoreService(NActors::TActorSystem *system, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id) - : ActorSystem_(system) - , Counters_(counters) - , GRpcRequestProxyId_(id) {} - -void TGRpcYdbLogStoreService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { - CQ_ = cq; - SetupIncomingRequests(std::move(logger)); -} - -void TGRpcYdbLogStoreService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { - Limiter_ = limiter; -} - -bool TGRpcYdbLogStoreService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcYdbLogStoreService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - void TGRpcYdbLogStoreService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { using namespace Ydb; auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); diff --git a/ydb/services/ydb/ydb_logstore.h b/ydb/services/ydb/ydb_logstore.h index 289bd296bfe..460c9683a0f 100644 --- a/ydb/services/ydb/ydb_logstore.h +++ b/ydb/services/ydb/ydb_logstore.h @@ -5,29 +5,17 @@ #include <library/cpp/grpc/server/grpc_server.h> #include <library/cpp/actors/core/actorsystem.h> +#include <ydb/core/grpc_services/base/base_service.h> + namespace NKikimr::NGRpcService { class TGRpcYdbLogStoreService - : public NGrpc::TGrpcServiceBase<Ydb::LogStore::V1::LogStoreService> + : public TGrpcServiceBase<Ydb::LogStore::V1::LogStoreService> { public: - TGRpcYdbLogStoreService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id); - - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); + using TGrpcServiceBase<Ydb::LogStore::V1::LogStoreService>::TGrpcServiceBase; private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - - NActors::TActorSystem* ActorSystem_; - grpc::ServerCompletionQueue* CQ_ = nullptr; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; - NActors::TActorId GRpcRequestProxyId_; - NGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } diff --git a/ydb/services/ydb/ydb_long_tx.cpp b/ydb/services/ydb/ydb_long_tx.cpp index 4c7aadcedaf..9f7d6fe355e 100644 --- a/ydb/services/ydb/ydb_long_tx.cpp +++ b/ydb/services/ydb/ydb_long_tx.cpp @@ -7,33 +7,6 @@ namespace NKikimr { namespace NGRpcService { - -TGRpcYdbLongTxService::TGRpcYdbLongTxService(NActors::TActorSystem* system, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id) - : ActorSystem_(system) - , Counters_(counters) - , GRpcRequestProxyId_(id) -{} - -void TGRpcYdbLongTxService::InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) { - CQ_ = cq; - SetupIncomingRequests(std::move(logger)); -} - -void TGRpcYdbLongTxService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { - Limiter_ = limiter; -} - -bool TGRpcYdbLongTxService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcYdbLongTxService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - void TGRpcYdbLongTxService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); diff --git a/ydb/services/ydb/ydb_long_tx.h b/ydb/services/ydb/ydb_long_tx.h index 53d50b3c357..75ffe47800e 100644 --- a/ydb/services/ydb/ydb_long_tx.h +++ b/ydb/services/ydb/ydb_long_tx.h @@ -3,31 +3,18 @@ #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/grpc/server/grpc_server.h> #include <ydb/public/api/grpc/draft/ydb_long_tx_v1.grpc.pb.h> +#include <ydb/core/grpc_services/base/base_service.h> namespace NKikimr { namespace NGRpcService { class TGRpcYdbLongTxService - : public NGrpc::TGrpcServiceBase<Ydb::LongTx::V1::LongTxService> + : public TGrpcServiceBase<Ydb::LongTx::V1::LongTxService> { public: - TGRpcYdbLongTxService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id); - - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); + using TGrpcServiceBase<Ydb::LongTx::V1::LongTxService>::TGrpcServiceBase; private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - - NActors::TActorSystem* ActorSystem_; - grpc::ServerCompletionQueue* CQ_ = nullptr; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; - NActors::TActorId GRpcRequestProxyId_; - NGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } // namespace NGRpcService diff --git a/ydb/services/ydb/ydb_operation.cpp b/ydb/services/ydb/ydb_operation.cpp index c611f13ac15..f202a0b8c8c 100644 --- a/ydb/services/ydb/ydb_operation.cpp +++ b/ydb/services/ydb/ydb_operation.cpp @@ -8,30 +8,6 @@ namespace NKikimr { namespace NGRpcService { -TGRpcOperationService::TGRpcOperationService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id) - : ActorSystem_(system) - , Counters_(counters) - , GRpcRequestProxyId_(id) -{ } - -void TGRpcOperationService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { - CQ_ = cq; - SetupIncomingRequests(std::move(logger)); -} - -void TGRpcOperationService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter *limiter) { - Limiter_ = limiter; -} - -bool TGRpcOperationService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcOperationService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - void TGRpcOperationService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); using namespace Ydb; @@ -46,7 +22,7 @@ void TGRpcOperationService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ ActorSystem_->Send(GRpcRequestProxyId_, \ new TCALL<Operations::NAME##Request, Operations::NAME##Response> \ - (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::Rps, nullptr})); \ + (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); \ }, &Operation::V1::OperationService::AsyncService::Request ## NAME, \ #NAME, logger, getCounterBlock("operation", #NAME))->Run(); diff --git a/ydb/services/ydb/ydb_operation.h b/ydb/services/ydb/ydb_operation.h index fc2ea409eb5..2e3959997a5 100644 --- a/ydb/services/ydb/ydb_operation.h +++ b/ydb/services/ydb/ydb_operation.h @@ -3,30 +3,18 @@ #include <library/cpp/actors/core/actorsystem.h> #include <ydb/public/api/grpc/ydb_operation_v1.grpc.pb.h> #include <library/cpp/grpc/server/grpc_server.h> +#include <ydb/core/grpc_services/base/base_service.h> namespace NKikimr { namespace NGRpcService { class TGRpcOperationService - : public NGrpc::TGrpcServiceBase<Ydb::Operation::V1::OperationService> + : public TGrpcServiceBase<Ydb::Operation::V1::OperationService> { public: - TGRpcOperationService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id); - - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); + using TGrpcServiceBase<Ydb::Operation::V1::OperationService>::TGrpcServiceBase; private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - - NActors::TActorSystem* ActorSystem_; - grpc::ServerCompletionQueue* CQ_ = nullptr; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; - NActors::TActorId GRpcRequestProxyId_; - NGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } // namespace NGRpcService diff --git a/ydb/services/ydb/ydb_s3_internal.cpp b/ydb/services/ydb/ydb_s3_internal.cpp index 326097b8724..3dcada013e6 100644 --- a/ydb/services/ydb/ydb_s3_internal.cpp +++ b/ydb/services/ydb/ydb_s3_internal.cpp @@ -7,30 +7,6 @@ namespace NKikimr { namespace NGRpcService { -TGRpcYdbS3InternalService::TGRpcYdbS3InternalService(NActors::TActorSystem *system, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id) - : ActorSystem_(system) - , Counters_(counters) - , GRpcRequestProxyId_(id) {} - -void TGRpcYdbS3InternalService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { - CQ_ = cq; - SetupIncomingRequests(std::move(logger)); -} - -void TGRpcYdbS3InternalService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { - Limiter_ = limiter; -} - -bool TGRpcYdbS3InternalService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcYdbS3InternalService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - void TGRpcYdbS3InternalService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); diff --git a/ydb/services/ydb/ydb_s3_internal.h b/ydb/services/ydb/ydb_s3_internal.h index 58c2559eb73..9306ed09f1f 100644 --- a/ydb/services/ydb/ydb_s3_internal.h +++ b/ydb/services/ydb/ydb_s3_internal.h @@ -3,31 +3,19 @@ #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/grpc/server/grpc_server.h> #include <ydb/public/api/grpc/draft/ydb_s3_internal_v1.grpc.pb.h> +#include <ydb/core/grpc_services/base/base_service.h> namespace NKikimr { namespace NGRpcService { class TGRpcYdbS3InternalService - : public NGrpc::TGrpcServiceBase<Ydb::S3Internal::V1::S3InternalService> + : public TGrpcServiceBase<Ydb::S3Internal::V1::S3InternalService> { public: - TGRpcYdbS3InternalService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id); + using TGrpcServiceBase<Ydb::S3Internal::V1::S3InternalService>::TGrpcServiceBase; - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - - NActors::TActorSystem* ActorSystem_; - grpc::ServerCompletionQueue* CQ_ = nullptr; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; - NActors::TActorId GRpcRequestProxyId_; - NGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } // namespace NGRpcService diff --git a/ydb/services/ydb/ydb_scheme.cpp b/ydb/services/ydb/ydb_scheme.cpp index 1c9195ee853..0bc6bc45b22 100644 --- a/ydb/services/ydb/ydb_scheme.cpp +++ b/ydb/services/ydb/ydb_scheme.cpp @@ -8,30 +8,6 @@ namespace NKikimr { namespace NGRpcService { -TGRpcYdbSchemeService::TGRpcYdbSchemeService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id) - : ActorSystem_(system) - , Counters_(counters) - , GRpcRequestProxyId_(id) -{ } - -void TGRpcYdbSchemeService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { - CQ_ = cq; - SetupIncomingRequests(std::move(logger)); -} - -void TGRpcYdbSchemeService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { - Limiter_ = limiter; -} - -bool TGRpcYdbSchemeService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcYdbSchemeService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - void TGRpcYdbSchemeService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); @@ -45,7 +21,7 @@ void TGRpcYdbSchemeService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ ActorSystem_->Send(GRpcRequestProxyId_, \ new TGrpcRequestOperationCall<Ydb::Scheme::NAME##Request, Ydb::Scheme::NAME##Response> \ - (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::Rps, nullptr})); \ + (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); \ }, &Ydb::Scheme::V1::SchemeService::AsyncService::Request ## NAME, \ #NAME, logger, getCounterBlock("scheme", #NAME))->Run(); diff --git a/ydb/services/ydb/ydb_scheme.h b/ydb/services/ydb/ydb_scheme.h index f35823e365a..9356c35e81d 100644 --- a/ydb/services/ydb/ydb_scheme.h +++ b/ydb/services/ydb/ydb_scheme.h @@ -3,30 +3,20 @@ #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/grpc/server/grpc_server.h> #include <ydb/public/api/grpc/ydb_scheme_v1.grpc.pb.h> +#include <ydb/core/grpc_services/base/base_service.h> namespace NKikimr { namespace NGRpcService { class TGRpcYdbSchemeService - : public NGrpc::TGrpcServiceBase<Ydb::Scheme::V1::SchemeService> + : public TGrpcServiceBase<Ydb::Scheme::V1::SchemeService> { public: - TGRpcYdbSchemeService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id); + using TGrpcServiceBase<Ydb::Scheme::V1::SchemeService>::TGrpcServiceBase; - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - NActors::TActorSystem* ActorSystem_; - grpc::ServerCompletionQueue* CQ_ = nullptr; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; - NActors::TActorId GRpcRequestProxyId_; - NGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } // namespace NGRpcService diff --git a/ydb/services/ydb/ydb_scripting.cpp b/ydb/services/ydb/ydb_scripting.cpp index e5164beb9c4..7562d31cc16 100644 --- a/ydb/services/ydb/ydb_scripting.cpp +++ b/ydb/services/ydb/ydb_scripting.cpp @@ -8,30 +8,6 @@ namespace NKikimr { namespace NGRpcService { -TGRpcYdbScriptingService::TGRpcYdbScriptingService(NActors::TActorSystem *system, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id) - : ActorSystem_(system) - , Counters_(counters) - , GRpcRequestProxyId_(id) {} - -void TGRpcYdbScriptingService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { - CQ_ = cq; - SetupIncomingRequests(std::move(logger)); -} - -void TGRpcYdbScriptingService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { - Limiter_ = limiter; -} - -bool TGRpcYdbScriptingService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcYdbScriptingService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - void TGRpcYdbScriptingService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { using Ydb::Scripting::ExecuteYqlRequest; using Ydb::Scripting::ExecuteYqlResponse; @@ -55,19 +31,19 @@ void TGRpcYdbScriptingService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { ADD_REQUEST(ExecuteYql, ExecuteYqlRequest, ExecuteYqlResponse, { ActorSystem_->Send(GRpcRequestProxyId_, new TGrpcRequestOperationCall<ExecuteYqlRequest, ExecuteYqlResponse> - (ctx, &DoExecuteYqlScript, TRequestAuxSettings{TRateLimiterMode::Ru, nullptr})); + (ctx, &DoExecuteYqlScript, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Ru), nullptr})); }) ADD_REQUEST(StreamExecuteYql, ExecuteYqlRequest, ExecuteYqlPartialResponse, { ActorSystem_->Send(GRpcRequestProxyId_, new TGrpcRequestNoOperationCall<ExecuteYqlRequest, ExecuteYqlPartialResponse> - (ctx, &DoStreamExecuteYqlScript, TRequestAuxSettings{TRateLimiterMode::Rps, nullptr})); + (ctx, &DoStreamExecuteYqlScript, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); }) ADD_REQUEST(ExplainYql, ExplainYqlRequest, ExplainYqlResponse, { ActorSystem_->Send(GRpcRequestProxyId_, new TGrpcRequestOperationCall<ExplainYqlRequest, ExplainYqlResponse> - (ctx, &DoExplainYqlScript, TRequestAuxSettings{TRateLimiterMode::Rps, nullptr})); + (ctx, &DoExplainYqlScript, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); }) #undef ADD_REQUEST } diff --git a/ydb/services/ydb/ydb_scripting.h b/ydb/services/ydb/ydb_scripting.h index c68839abb21..67de75c14c5 100644 --- a/ydb/services/ydb/ydb_scripting.h +++ b/ydb/services/ydb/ydb_scripting.h @@ -3,31 +3,20 @@ #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/grpc/server/grpc_server.h> #include <ydb/public/api/grpc/ydb_scripting_v1.grpc.pb.h> +#include <ydb/core/grpc_services/base/base_service.h> + namespace NKikimr { namespace NGRpcService { class TGRpcYdbScriptingService - : public NGrpc::TGrpcServiceBase<Ydb::Scripting::V1::ScriptingService> + : public TGrpcServiceBase<Ydb::Scripting::V1::ScriptingService> { public: - TGRpcYdbScriptingService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id); - - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; + using TGrpcServiceBase<Ydb::Scripting::V1::ScriptingService>::TGrpcServiceBase; - bool IncRequest(); - void DecRequest(); private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - - NActors::TActorSystem* ActorSystem_; - grpc::ServerCompletionQueue* CQ_ = nullptr; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; - NActors::TActorId GRpcRequestProxyId_; - NGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } // namespace NGRpcService diff --git a/ydb/services/ydb/ydb_table.cpp b/ydb/services/ydb/ydb_table.cpp index e8770c4d351..35d55fa4b9d 100644 --- a/ydb/services/ydb/ydb_table.cpp +++ b/ydb/services/ydb/ydb_table.cpp @@ -7,30 +7,6 @@ namespace NKikimr { namespace NGRpcService { -TGRpcYdbTableService::TGRpcYdbTableService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id) - : ActorSystem_(system) - , Counters_(counters) - , GRpcRequestProxyId_(id) -{ } - -void TGRpcYdbTableService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { - CQ_ = cq; - SetupIncomingRequests(std::move(logger)); -} - -void TGRpcYdbTableService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { - Limiter_ = limiter; -} - -bool TGRpcYdbTableService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcYdbTableService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - void TGRpcYdbTableService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); #ifdef ADD_REQUEST_LIMIT @@ -48,7 +24,7 @@ void TGRpcYdbTableService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ ActorSystem_->Send(GRpcRequestProxyId_, \ new TGrpcRequestOperationCall<Ydb::Table::NAME##Request, Ydb::Table::NAME##Response> \ - (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::LIMIT_TYPE, nullptr})); \ + (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::LIMIT_TYPE), nullptr})); \ }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \ #NAME, logger, getCounterBlock("table", #NAME))->Run(); @@ -59,7 +35,7 @@ void TGRpcYdbTableService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ ActorSystem_->Send(GRpcRequestProxyId_, \ new TGrpcRequestNoOperationCall<Ydb::Table::IN, Ydb::Table::OUT> \ - (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::LIMIT_TYPE, nullptr})); \ + (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::LIMIT_TYPE), nullptr})); \ }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \ #NAME, logger, getCounterBlock("table", #NAME))->Run(); diff --git a/ydb/services/ydb/ydb_table.h b/ydb/services/ydb/ydb_table.h index 083496f129e..38f9ea77c0b 100644 --- a/ydb/services/ydb/ydb_table.h +++ b/ydb/services/ydb/ydb_table.h @@ -3,32 +3,19 @@ #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/grpc/server/grpc_server.h> #include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h> - +#include <ydb/core/grpc_services/base/base_service.h> namespace NKikimr { namespace NGRpcService { class TGRpcYdbTableService - : public NGrpc::TGrpcServiceBase<Ydb::Table::V1::TableService> + : public TGrpcServiceBase<Ydb::Table::V1::TableService> { public: - TGRpcYdbTableService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id); - - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; + using TGrpcServiceBase<Ydb::Table::V1::TableService>::TGrpcServiceBase; - bool IncRequest(); - void DecRequest(); private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - - NActors::TActorSystem* ActorSystem_; - grpc::ServerCompletionQueue* CQ_ = nullptr; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; - NActors::TActorId GRpcRequestProxyId_; - NGrpc::TGlobalLimiter* Limiter_ = nullptr; }; } // namespace NGRpcService |