aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2022-08-31 11:36:00 +0300
committerdcherednik <dcherednik@ydb.tech>2022-08-31 11:36:00 +0300
commitbcd6b2653a988889aa6d990f5f30ca0ecac41a54 (patch)
tree079d5a654214c9dcb20a3fc905e35237e99eb0b9
parentdb58955e8e90ba21f2d9b82fa9af131f40ff1c6f (diff)
downloadydb-bcd6b2653a988889aa6d990f5f30ca0ecac41a54.tar.gz
Pass RL enable/disable flag in to grpc service.
-rw-r--r--ydb/core/driver_lib/run/run.cpp167
-rw-r--r--ydb/core/grpc_services/base/base.h3
-rw-r--r--ydb/core/grpc_services/base/base_service.h66
-rw-r--r--ydb/core/protos/config.proto3
-rw-r--r--ydb/core/testlib/test_client.cpp38
-rw-r--r--ydb/services/auth/grpc_service.cpp27
-rw-r--r--ydb/services/auth/grpc_service.h20
-rw-r--r--ydb/services/cms/grpc_service.cpp30
-rw-r--r--ydb/services/cms/grpc_service.h24
-rw-r--r--ydb/services/datastreams/grpc_service.cpp30
-rw-r--r--ydb/services/datastreams/grpc_service.h21
-rw-r--r--ydb/services/discovery/grpc_service.cpp31
-rw-r--r--ydb/services/discovery/grpc_service.h23
-rw-r--r--ydb/services/kesus/grpc_service.cpp50
-rw-r--r--ydb/services/kesus/grpc_service.h25
-rw-r--r--ydb/services/local_discovery/grpc_service.h1
-rw-r--r--ydb/services/monitoring/grpc_service.cpp27
-rw-r--r--ydb/services/monitoring/grpc_service.h23
-rw-r--r--ydb/services/persqueue_v1/persqueue.cpp80
-rw-r--r--ydb/services/persqueue_v1/persqueue.h17
-rw-r--r--ydb/services/persqueue_v1/topic.cpp74
-rw-r--r--ydb/services/persqueue_v1/topic.h17
-rw-r--r--ydb/services/ydb/ydb_clickhouse_internal.cpp28
-rw-r--r--ydb/services/ydb/ydb_clickhouse_internal.h18
-rw-r--r--ydb/services/ydb/ydb_experimental.cpp24
-rw-r--r--ydb/services/ydb/ydb_experimental.h18
-rw-r--r--ydb/services/ydb/ydb_export.cpp24
-rw-r--r--ydb/services/ydb/ydb_export.h17
-rw-r--r--ydb/services/ydb/ydb_import.cpp24
-rw-r--r--ydb/services/ydb/ydb_import.h18
-rw-r--r--ydb/services/ydb/ydb_logstore.cpp25
-rw-r--r--ydb/services/ydb/ydb_logstore.h20
-rw-r--r--ydb/services/ydb/ydb_long_tx.cpp27
-rw-r--r--ydb/services/ydb/ydb_long_tx.h19
-rw-r--r--ydb/services/ydb/ydb_operation.cpp26
-rw-r--r--ydb/services/ydb/ydb_operation.h18
-rw-r--r--ydb/services/ydb/ydb_s3_internal.cpp24
-rw-r--r--ydb/services/ydb/ydb_s3_internal.h18
-rw-r--r--ydb/services/ydb/ydb_scheme.cpp26
-rw-r--r--ydb/services/ydb/ydb_scheme.h16
-rw-r--r--ydb/services/ydb/ydb_scripting.cpp30
-rw-r--r--ydb/services/ydb/ydb_scripting.h19
-rw-r--r--ydb/services/ydb/ydb_table.cpp28
-rw-r--r--ydb/services/ydb/ydb_table.h19
44 files changed, 369 insertions, 914 deletions
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index 6e6686b32b1..25f98caaf3c 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -496,64 +496,85 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
auto fillFn = [&](const NKikimrConfig::TGRpcConfig& grpcConfig, NGrpc::TGRpcServer& server, NGrpc::TServerOptions& opts) {
const auto& services = grpcConfig.GetServices();
+ const auto& rlServicesEnabled = grpcConfig.GetRatelimiterServicesEnabled();
+ const auto& rlServicesDisabled = grpcConfig.GetRatelimiterServicesDisabled();
+
+ class TServiceCfg {
+ public:
+ TServiceCfg(bool enabled)
+ : ServiceEnabled(enabled)
+ { }
+ operator bool() const {
+ return ServiceEnabled;
+ }
+ bool IsRlAllowed() const {
+ return RlAllowed;
+ }
+ void SetRlAllowed(bool allowed) {
+ RlAllowed = allowed;
+ }
+ private:
+ bool ServiceEnabled = false;
+ bool RlAllowed = false;
+ };
- std::unordered_map<TString, bool*> names;
+ std::unordered_map<TString, TServiceCfg*> names;
- bool hasLegacy = opts.SslData.Empty() && services.empty();
+ TServiceCfg hasLegacy = opts.SslData.Empty() && services.empty();
names["legacy"] = &hasLegacy;
- bool hasScripting = services.empty();
+ TServiceCfg hasScripting = services.empty();
names["scripting"] = &hasScripting;
- bool hasCms = services.empty();
+ TServiceCfg hasCms = services.empty();
names["cms"] = &hasCms;
- bool hasKesus = services.empty();
+ TServiceCfg hasKesus = services.empty();
names["locking"] = names["kesus"] = &hasKesus;
- bool hasMonitoring = services.empty();
+ TServiceCfg hasMonitoring = services.empty();
names["monitoring"] = &hasMonitoring;
- bool hasDiscovery = services.empty();
+ TServiceCfg hasDiscovery = services.empty();
names["discovery"] = &hasDiscovery;
- bool hasLocalDiscovery = false;
+ TServiceCfg hasLocalDiscovery = false;
names["local_discovery"] = &hasLocalDiscovery;
- bool hasTableService = services.empty();
+ TServiceCfg hasTableService = services.empty();
names["table_service"] = &hasTableService;
- bool hasSchemeService = false;
- bool hasOperationService = false;
- bool hasYql = false;
+ TServiceCfg hasSchemeService = false;
+ TServiceCfg hasOperationService = false;
+ TServiceCfg hasYql = false;
names["yql"] = &hasYql;
- bool hasYqlInternal = services.empty();
+ TServiceCfg hasYqlInternal = services.empty();
names["yql_internal"] = &hasYqlInternal;
- bool hasPQ = services.empty();
+ TServiceCfg hasPQ = services.empty();
names["pq"] = &hasPQ;
- bool hasPQv1 = services.empty();
+ TServiceCfg hasPQv1 = services.empty();
names["pqv1"] = &hasPQv1;
- bool hasTopic = services.empty();
+ TServiceCfg hasTopic = services.empty();
names["topic"] = &hasTopic;
- bool hasPQCD = services.empty();
+ TServiceCfg hasPQCD = services.empty();
names["pqcd"] = &hasPQCD;
- bool hasS3Internal = false;
+ TServiceCfg hasS3Internal = false;
names["s3_internal"] = &hasS3Internal;
- bool hasExperimental = false;
+ TServiceCfg hasExperimental = false;
names["experimental"] = &hasExperimental;
- bool hasClickhouseInternal = services.empty();
+ TServiceCfg hasClickhouseInternal = services.empty();
names["clickhouse_internal"] = &hasClickhouseInternal;
- bool hasRateLimiter = false;
+ TServiceCfg hasRateLimiter = false;
names["rate_limiter"] = &hasRateLimiter;
- bool hasLongTx = false;
+ TServiceCfg hasLongTx = false;
names["long_tx"] = &hasLongTx;
- bool hasExport = services.empty();
+ TServiceCfg hasExport = services.empty();
names["export"] = &hasExport;
- bool hasImport = services.empty();
+ TServiceCfg hasImport = services.empty();
names["import"] = &hasImport;
- bool hasAnalytics = false;
+ TServiceCfg hasAnalytics = false;
names["analytics"] = &hasAnalytics;
- bool hasDataStreams = false;
+ TServiceCfg hasDataStreams = false;
names["datastreams"] = &hasDataStreams;
- bool hasYandexQuery = false;
+ TServiceCfg hasYandexQuery = false;
names["yq"] = &hasYandexQuery;
- bool hasYandexQueryPrivate = false;
+ TServiceCfg hasYandexQueryPrivate = false;
names["yq_private"] = &hasYandexQueryPrivate;
- bool hasLogStore = false;
+ TServiceCfg hasLogStore = false;
names["logstore"] = &hasLogStore;
- bool hasAuth = services.empty();
+ TServiceCfg hasAuth = services.empty();
names["auth"] = &hasAuth;
std::unordered_set<TString> enabled;
@@ -609,6 +630,34 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
hasImport = true;
}
+ // Enable RL for all services if enabled list is empty
+ if (rlServicesEnabled.empty()) {
+ for (auto& [name, cfg] : names) {
+ Y_VERIFY(cfg);
+ cfg->SetRlAllowed(true);
+ }
+ } else {
+ for (const auto& name : rlServicesEnabled) {
+ auto itName = names.find(name);
+ if (itName != names.end()) {
+ Y_VERIFY(itName->second);
+ itName->second->SetRlAllowed(true);
+ } else if (!ModuleFactories || !ModuleFactories->GrpcServiceFactory.Has(name)) {
+ Cerr << "Unknown grpc service \"" << name << "\" rl was not enabled" << Endl;
+ }
+ }
+ }
+
+ for (const auto& name : rlServicesDisabled) {
+ auto itName = names.find(name);
+ if (itName != names.end()) {
+ Y_VERIFY(itName->second);
+ itName->second->SetRlAllowed(false);
+ } else if (!ModuleFactories || !ModuleFactories->GrpcServiceFactory.Has(name)) {
+ Cerr << "Unknown grpc service \"" << name << "\" rl was not disabled" << Endl;
+ }
+ }
+
for (const auto& [name, isEnabled] : names) {
if (*isEnabled) {
enabled.insert(name);
@@ -642,59 +691,70 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
}
if (hasTableService) {
- server.AddService(new NGRpcService::TGRpcYdbTableService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ server.AddService(new NGRpcService::TGRpcYdbTableService(ActorSystem.Get(), Counters, grpcRequestProxyId,
+ hasTableService.IsRlAllowed()));
}
if (hasExperimental) {
server.AddService(new NGRpcService::TGRpcYdbExperimentalService(ActorSystem.Get(), Counters,
- grpcRequestProxyId));
+ grpcRequestProxyId, hasExperimental.IsRlAllowed()));
}
if (hasClickhouseInternal) {
server.AddService(new NGRpcService::TGRpcYdbClickhouseInternalService(ActorSystem.Get(), Counters,
- AppData->InFlightLimiterRegistry, grpcRequestProxyId));
+ AppData->InFlightLimiterRegistry, grpcRequestProxyId, hasClickhouseInternal.IsRlAllowed()));
}
if (hasS3Internal) {
server.AddService(new NGRpcService::TGRpcYdbS3InternalService(ActorSystem.Get(), Counters,
- grpcRequestProxyId));
+ grpcRequestProxyId, hasS3Internal.IsRlAllowed()));
}
if (hasScripting) {
server.AddService(new NGRpcService::TGRpcYdbScriptingService(ActorSystem.Get(), Counters,
- grpcRequestProxyId));
+ grpcRequestProxyId, hasScripting.IsRlAllowed()));
}
if (hasLongTx) {
- server.AddService(new NGRpcService::TGRpcYdbLongTxService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ server.AddService(new NGRpcService::TGRpcYdbLongTxService(ActorSystem.Get(), Counters,
+ grpcRequestProxyId, hasLongTx.IsRlAllowed()));
}
if (hasSchemeService) {
- server.AddService(new NGRpcService::TGRpcYdbSchemeService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ // RPC RL enabled
+ // We have no way to disable or enable this service explicitly
+ server.AddService(new NGRpcService::TGRpcYdbSchemeService(ActorSystem.Get(), Counters,
+ grpcRequestProxyId, true /*hasSchemeService.IsRlAllowed()*/));
}
if (hasOperationService) {
- server.AddService(new NGRpcService::TGRpcOperationService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ server.AddService(new NGRpcService::TGRpcOperationService(ActorSystem.Get(), Counters,
+ grpcRequestProxyId, hasOperationService.IsRlAllowed()));
}
if (hasExport) {
- server.AddService(new NGRpcService::TGRpcYdbExportService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ server.AddService(new NGRpcService::TGRpcYdbExportService(ActorSystem.Get(), Counters,
+ grpcRequestProxyId, hasExport.IsRlAllowed()));
}
if (hasImport) {
- server.AddService(new NGRpcService::TGRpcYdbImportService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ server.AddService(new NGRpcService::TGRpcYdbImportService(ActorSystem.Get(), Counters,
+ grpcRequestProxyId, hasImport.IsRlAllowed()));
}
if (hasKesus) {
- server.AddService(new NKesus::TKesusGRpcService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ server.AddService(new NKesus::TKesusGRpcService(ActorSystem.Get(), Counters,
+ grpcRequestProxyId, hasKesus.IsRlAllowed()));
}
if (hasPQv1) {
- server.AddService(new NGRpcService::V1::TGRpcPersQueueService(ActorSystem.Get(), Counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId));
+ server.AddService(new NGRpcService::V1::TGRpcPersQueueService(ActorSystem.Get(), Counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(),
+ grpcRequestProxyId, hasPQv1.IsRlAllowed()));
}
if (hasPQv1 || hasTopic) {
- server.AddService(new NGRpcService::V1::TGRpcTopicService(ActorSystem.Get(), Counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId));
+ server.AddService(new NGRpcService::V1::TGRpcTopicService(ActorSystem.Get(), Counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(),
+ grpcRequestProxyId, hasTopic.IsRlAllowed() || hasPQv1.IsRlAllowed()));
}
if (hasPQCD) {
@@ -710,15 +770,18 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
}
if (hasCms) {
- server.AddService(new NGRpcService::TGRpcCmsService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ server.AddService(new NGRpcService::TGRpcCmsService(ActorSystem.Get(), Counters,
+ grpcRequestProxyId, hasCms.IsRlAllowed()));
}
if (hasDiscovery) {
- server.AddService(new NGRpcService::TGRpcDiscoveryService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ server.AddService(new NGRpcService::TGRpcDiscoveryService(ActorSystem.Get(), Counters,
+ grpcRequestProxyId, hasDiscovery.IsRlAllowed()));
}
if (hasLocalDiscovery) {
- server.AddService(new NGRpcService::TGRpcLocalDiscoveryService(grpcConfig, ActorSystem.Get(), Counters, grpcRequestProxyId));
+ server.AddService(new NGRpcService::TGRpcLocalDiscoveryService(grpcConfig, ActorSystem.Get(), Counters,
+ grpcRequestProxyId));
}
if (hasRateLimiter) {
@@ -726,15 +789,18 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
}
if (hasMonitoring) {
- server.AddService(new NGRpcService::TGRpcMonitoringService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ server.AddService(new NGRpcService::TGRpcMonitoringService(ActorSystem.Get(), Counters,
+ grpcRequestProxyId, hasMonitoring.IsRlAllowed()));
}
if (hasAuth) {
- server.AddService(new NGRpcService::TGRpcAuthService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ server.AddService(new NGRpcService::TGRpcAuthService(ActorSystem.Get(), Counters,
+ grpcRequestProxyId, hasAuth.IsRlAllowed()));
}
if (hasDataStreams) {
- server.AddService(new NGRpcService::TGRpcDataStreamsService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ server.AddService(new NGRpcService::TGRpcDataStreamsService(ActorSystem.Get(), Counters,
+ grpcRequestProxyId, hasDataStreams.IsRlAllowed()));
}
if (hasYandexQuery) {
@@ -746,7 +812,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
}
if (hasLogStore) {
- server.AddService(new NGRpcService::TGRpcYdbLogStoreService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ server.AddService(new NGRpcService::TGRpcYdbLogStoreService(ActorSystem.Get(), Counters,
+ grpcRequestProxyId, hasLogStore.IsRlAllowed()));
}
if (ModuleFactories) {
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index 6f236c993b7..ac0dd35b3e6 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -315,6 +315,9 @@ enum class TRateLimiterMode : ui8 {
RuOnProgress = 3,
};
+#define RLSWITCH(mode) \
+ IsRlAllowed() ? mode : TRateLimiterMode::Off
+
class ICheckerIface;
// The way to pass some common data to request processing
diff --git a/ydb/core/grpc_services/base/base_service.h b/ydb/core/grpc_services/base/base_service.h
new file mode 100644
index 00000000000..52db0b46df4
--- /dev/null
+++ b/ydb/core/grpc_services/base/base_service.h
@@ -0,0 +1,66 @@
+#pragma once
+
+#include <library/cpp/grpc/server/grpc_request_base.h>
+#include <library/cpp/grpc/server/logger.h>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+class TGrpcServiceCfg {
+public:
+ TGrpcServiceCfg(bool rlAllowed)
+ : RlAllowed_(rlAllowed)
+ { }
+
+ bool IsRlAllowed() const {
+ return RlAllowed_;
+ }
+private:
+ const bool RlAllowed_;
+};
+
+template <typename T>
+class TGrpcServiceBase
+ : public NGrpc::TGrpcServiceBase<T>
+ , public TGrpcServiceCfg
+{
+public:
+ TGrpcServiceBase(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id, bool rlAllowed)
+ : TGrpcServiceCfg(rlAllowed)
+ , ActorSystem_(system)
+ , Counters_(counters)
+ , GRpcRequestProxyId_(id)
+{ }
+
+ void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override {
+ CQ_ = cq;
+ SetupIncomingRequests(std::move(logger));
+ }
+
+ void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override {
+ Limiter_ = limiter;
+ }
+
+ bool IncRequest() {
+ return Limiter_->Inc();
+ }
+
+ void DecRequest() {
+ Limiter_->Dec();
+ Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
+ }
+
+protected:
+ virtual void SetupIncomingRequests(NGrpc::TLoggerPtr logger) = 0;
+
+ NActors::TActorSystem* ActorSystem_;
+ grpc::ServerCompletionQueue* CQ_ = nullptr;
+
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
+ const NActors::TActorId GRpcRequestProxyId_;
+
+ NGrpc::TGlobalLimiter* Limiter_ = nullptr;
+};
+
+}
+}
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 6bc5f12f4e8..89177e53dbe 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -612,6 +612,9 @@ message TGRpcConfig {
optional bool SkipSchemeCheck = 24 [default = false];
+ repeated string RatelimiterServicesEnabled = 25;
+ repeated string RatelimiterServicesDisabled = 26;
+
// server socket options
optional bool KeepAliveEnable = 100 [default = true]; // SO_KEEPALIVE
optional uint32 KeepAliveIdleTimeoutTriggerSec = 101 [default = 90]; // TCP_KEEPIDLE
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index a34653f9546..fb2d8cf6fd0 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -307,25 +307,25 @@ namespace Tests {
future.Subscribe(startCb);
GRpcServer->AddService(grpcService);
- GRpcServer->AddService(new NGRpcService::TGRpcYdbExportService(system, counters, grpcRequestProxyId));
- GRpcServer->AddService(new NGRpcService::TGRpcYdbImportService(system, counters, grpcRequestProxyId));
- GRpcServer->AddService(new NGRpcService::TGRpcYdbSchemeService(system, counters, grpcRequestProxyId));
- GRpcServer->AddService(new NGRpcService::TGRpcYdbTableService(system, counters, grpcRequestProxyId));
- GRpcServer->AddService(new NGRpcService::TGRpcYdbScriptingService(system, counters, grpcRequestProxyId));
- GRpcServer->AddService(new NGRpcService::TGRpcOperationService(system, counters, grpcRequestProxyId));
- GRpcServer->AddService(new NGRpcService::V1::TGRpcPersQueueService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId));
- GRpcServer->AddService(new NGRpcService::V1::TGRpcTopicService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbExportService(system, counters, grpcRequestProxyId, true));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbImportService(system, counters, grpcRequestProxyId, true));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbSchemeService(system, counters, grpcRequestProxyId, true));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbTableService(system, counters, grpcRequestProxyId, true));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbScriptingService(system, counters, grpcRequestProxyId, true));
+ GRpcServer->AddService(new NGRpcService::TGRpcOperationService(system, counters, grpcRequestProxyId, true));
+ GRpcServer->AddService(new NGRpcService::V1::TGRpcPersQueueService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId, true));
+ GRpcServer->AddService(new NGRpcService::V1::TGRpcTopicService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId, true));
GRpcServer->AddService(new NGRpcService::TGRpcPQClusterDiscoveryService(system, counters, grpcRequestProxyId));
- GRpcServer->AddService(new NKesus::TKesusGRpcService(system, counters, grpcRequestProxyId));
- GRpcServer->AddService(new NGRpcService::TGRpcCmsService(system, counters, grpcRequestProxyId));
- GRpcServer->AddService(new NGRpcService::TGRpcDiscoveryService(system, counters, grpcRequestProxyId));
- GRpcServer->AddService(new NGRpcService::TGRpcYdbExperimentalService(system, counters, grpcRequestProxyId));
- GRpcServer->AddService(new NGRpcService::TGRpcYdbClickhouseInternalService(system, counters, appData.InFlightLimiterRegistry, grpcRequestProxyId));
- GRpcServer->AddService(new NGRpcService::TGRpcYdbS3InternalService(system, counters, grpcRequestProxyId));
+ GRpcServer->AddService(new NKesus::TKesusGRpcService(system, counters, grpcRequestProxyId, true));
+ GRpcServer->AddService(new NGRpcService::TGRpcCmsService(system, counters, grpcRequestProxyId, true));
+ GRpcServer->AddService(new NGRpcService::TGRpcDiscoveryService(system, counters, grpcRequestProxyId, true));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbExperimentalService(system, counters, grpcRequestProxyId, true));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbClickhouseInternalService(system, counters, appData.InFlightLimiterRegistry, grpcRequestProxyId, true));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbS3InternalService(system, counters, grpcRequestProxyId, true));
GRpcServer->AddService(new NQuoter::TRateLimiterGRpcService(system, counters, grpcRequestProxyId));
- GRpcServer->AddService(new NGRpcService::TGRpcYdbLongTxService(system, counters, grpcRequestProxyId));
- GRpcServer->AddService(new NGRpcService::TGRpcDataStreamsService(system, counters, grpcRequestProxyId));
- GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxyId));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbLongTxService(system, counters, grpcRequestProxyId, true));
+ GRpcServer->AddService(new NGRpcService::TGRpcDataStreamsService(system, counters, grpcRequestProxyId, true));
+ GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxyId, true));
if (Settings->EnableYq) {
GRpcServer->AddService(new NGRpcService::TGRpcYandexQueryService(system, counters, grpcRequestProxyId));
GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxyId));
@@ -338,8 +338,8 @@ namespace Tests {
GRpcServer->AddService(service);
}
}
- GRpcServer->AddService(new NGRpcService::TGRpcYdbLogStoreService(system, counters, grpcRequestProxyId));
- GRpcServer->AddService(new NGRpcService::TGRpcAuthService(system, counters, grpcRequestProxyId));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbLogStoreService(system, counters, grpcRequestProxyId, true));
+ GRpcServer->AddService(new NGRpcService::TGRpcAuthService(system, counters, grpcRequestProxyId, true));
GRpcServer->Start();
}
diff --git a/ydb/services/auth/grpc_service.cpp b/ydb/services/auth/grpc_service.cpp
index 996cd501684..8a2eac7acbe 100644
--- a/ydb/services/auth/grpc_service.cpp
+++ b/ydb/services/auth/grpc_service.cpp
@@ -14,33 +14,6 @@ static TString GetSdkBuildInfo(NGrpc::IRequestContextBase* reqCtx) {
return TString{res[0]};
}
-TGRpcAuthService::TGRpcAuthService(NActors::TActorSystem* system,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id)
- : ActorSystem_(system)
- , Counters_(counters)
- , GRpcRequestProxyId_(id)
-{
-}
-
-void TGRpcAuthService::InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) {
- CQ_ = cq;
- SetupIncomingRequests(std::move(logger));
-}
-
-void TGRpcAuthService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcAuthService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcAuthService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
void TGRpcAuthService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
#ifdef ADD_REQUEST
diff --git a/ydb/services/auth/grpc_service.h b/ydb/services/auth/grpc_service.h
index 61e2e884c96..09fa4b4e875 100644
--- a/ydb/services/auth/grpc_service.h
+++ b/ydb/services/auth/grpc_service.h
@@ -3,33 +3,19 @@
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/grpc/server/grpc_server.h>
#include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h>
+#include <ydb/core/grpc_services/base/base_service.h>
namespace NKikimr {
namespace NGRpcService {
- class TGRpcAuthService : public NGrpc::TGrpcServiceBase<Ydb::Auth::V1::AuthService>
+ class TGRpcAuthService : public TGrpcServiceBase<Ydb::Auth::V1::AuthService>
{
public:
- TGRpcAuthService(NActors::TActorSystem* system,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id);
-
- void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
+ using TGrpcServiceBase<Ydb::Auth::V1::AuthService>::TGrpcServiceBase;
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
-
- NActors::TActorSystem* ActorSystem_;
- grpc::ServerCompletionQueue* CQ_ = nullptr;
-
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
- NActors::TActorId GRpcRequestProxyId_;
- NGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
} // namespace NGRpcService
diff --git a/ydb/services/cms/grpc_service.cpp b/ydb/services/cms/grpc_service.cpp
index 5905ab33859..728a578c488 100644
--- a/ydb/services/cms/grpc_service.cpp
+++ b/ydb/services/cms/grpc_service.cpp
@@ -7,34 +7,6 @@
namespace NKikimr {
namespace NGRpcService {
-TGRpcCmsService::TGRpcCmsService(NActors::TActorSystem *system,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id)
- : ActorSystem_(system)
- , Counters_(counters)
- , GRpcRequestProxyId_(id)
-{
-}
-
-void TGRpcCmsService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
- CQ_ = cq;
- SetupIncomingRequests(std::move(logger));
-}
-
-void TGRpcCmsService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter)
-{
- Limiter_ = limiter;
-}
-
-bool TGRpcCmsService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcCmsService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
void TGRpcCmsService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
using namespace Ydb;
@@ -49,7 +21,7 @@ void TGRpcCmsService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
ActorSystem_->Send(GRpcRequestProxyId_, \
new TGrpcRequestOperationCall<Cms::NAME##Request, Cms::NAME##Response> \
- (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::Rps, nullptr})); \
+ (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); \
}, &Cms::V1::CmsService::AsyncService::Request ## NAME, \
#NAME, logger, getCounterBlock("cms", #NAME))->Run();
diff --git a/ydb/services/cms/grpc_service.h b/ydb/services/cms/grpc_service.h
index 8367051cce5..967eed68c95 100644
--- a/ydb/services/cms/grpc_service.h
+++ b/ydb/services/cms/grpc_service.h
@@ -3,34 +3,18 @@
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/grpc/server/grpc_server.h>
#include <ydb/public/api/grpc/ydb_cms_v1.grpc.pb.h>
-
+#include <ydb/core/grpc_services/base/base_service.h>
namespace NKikimr {
namespace NGRpcService {
class TGRpcCmsService
- : public NGrpc::TGrpcServiceBase<Ydb::Cms::V1::CmsService>
+ : public TGrpcServiceBase<Ydb::Cms::V1::CmsService>
{
public:
- TGRpcCmsService(NActors::TActorSystem* system,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id);
-
- void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
-
- private:
+ using TGrpcServiceBase<Ydb::Cms::V1::CmsService>::TGrpcServiceBase;
+private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
-
- NActors::TActorSystem* ActorSystem_;
- grpc::ServerCompletionQueue* CQ_ = nullptr;
-
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
- NActors::TActorId GRpcRequestProxyId_;
- NGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
} // namespace NGRpcService
diff --git a/ydb/services/datastreams/grpc_service.cpp b/ydb/services/datastreams/grpc_service.cpp
index 5337821d746..7df7b15f819 100644
--- a/ydb/services/datastreams/grpc_service.cpp
+++ b/ydb/services/datastreams/grpc_service.cpp
@@ -37,36 +37,6 @@ void YdsProcessAttr(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData,
namespace NKikimr::NGRpcService {
-TGRpcDataStreamsService::TGRpcDataStreamsService(NActors::TActorSystem *system,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id)
- : ActorSystem_(system)
- , Counters_(counters)
- , GRpcRequestProxyId_(id)
-{
-}
-
-void TGRpcDataStreamsService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger)
-{
- CQ_ = cq;
-
- SetupIncomingRequests(logger);
-}
-
-void TGRpcDataStreamsService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter *limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcDataStreamsService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcDataStreamsService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
-
void TGRpcDataStreamsService::SetupIncomingRequests(NGrpc::TLoggerPtr logger)
{
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
diff --git a/ydb/services/datastreams/grpc_service.h b/ydb/services/datastreams/grpc_service.h
index 0c80b53ee8f..0dbab489933 100644
--- a/ydb/services/datastreams/grpc_service.h
+++ b/ydb/services/datastreams/grpc_service.h
@@ -3,31 +3,16 @@
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/grpc/server/grpc_server.h>
#include <ydb/public/api/grpc/draft/ydb_datastreams_v1.grpc.pb.h>
+#include <ydb/core/grpc_services/base/base_service.h>
namespace NKikimr::NGRpcService {
- class TGRpcDataStreamsService : public NGrpc::TGrpcServiceBase<Ydb::DataStreams::V1::DataStreamsService>
+ class TGRpcDataStreamsService : public TGrpcServiceBase<Ydb::DataStreams::V1::DataStreamsService>
{
public:
- TGRpcDataStreamsService(NActors::TActorSystem* system,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id);
-
- void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
-
+ using TGrpcServiceBase<Ydb::DataStreams::V1::DataStreamsService>::TGrpcServiceBase;
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
-
- NActors::TActorSystem* ActorSystem_;
- grpc::ServerCompletionQueue* CQ_ = nullptr;
-
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
- NActors::TActorId GRpcRequestProxyId_;
- NGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
}
diff --git a/ydb/services/discovery/grpc_service.cpp b/ydb/services/discovery/grpc_service.cpp
index 2df0e98629b..2ffebc9ff36 100644
--- a/ydb/services/discovery/grpc_service.cpp
+++ b/ydb/services/discovery/grpc_service.cpp
@@ -16,34 +16,7 @@ static TString GetSdkBuildInfo(NGrpc::IRequestContextBase* reqCtx) {
return TString{res[0]};
}
-TGRpcDiscoveryService::TGRpcDiscoveryService(NActors::TActorSystem *system,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id)
- : ActorSystem_(system)
- , Counters_(counters)
- , GRpcRequestProxyId_(id)
- {
- }
-
- void TGRpcDiscoveryService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
- CQ_ = cq;
- SetupIncomingRequests(std::move(logger));
- }
-
- void TGRpcDiscoveryService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter *limiter) {
- Limiter_ = limiter;
- }
-
- bool TGRpcDiscoveryService::IncRequest() {
- return Limiter_->Inc();
- }
-
- void TGRpcDiscoveryService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
- }
-
- void TGRpcDiscoveryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
+void TGRpcDiscoveryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
using namespace Ydb;
#ifdef ADD_REQUEST
@@ -56,7 +29,7 @@ TGRpcDiscoveryService::TGRpcDiscoveryService(NActors::TActorSystem *system,
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer(), GetSdkBuildInfo(ctx)); \
ActorSystem_->Send(GRpcRequestProxyId_, \
new TGrpcRequestOperationCall<Discovery::NAME##Request, Discovery::NAME##Response> \
- (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::Rps, nullptr})); \
+ (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); \
}, &Ydb::Discovery::V1::DiscoveryService::AsyncService::Request ## NAME, \
#NAME, logger, getCounterBlock("discovery", #NAME))->Run();
diff --git a/ydb/services/discovery/grpc_service.h b/ydb/services/discovery/grpc_service.h
index 4b36484577b..697f6e1c07b 100644
--- a/ydb/services/discovery/grpc_service.h
+++ b/ydb/services/discovery/grpc_service.h
@@ -6,32 +6,21 @@
#include <library/cpp/grpc/server/grpc_server.h>
+#include <ydb/core/grpc_services/base/base_service.h>
+
+
namespace NKikimr {
namespace NGRpcService {
class TGRpcDiscoveryService
- : public NGrpc::TGrpcServiceBase<Ydb::Discovery::V1::DiscoveryService>
+ : public TGrpcServiceBase<Ydb::Discovery::V1::DiscoveryService>
{
- public:
- TGRpcDiscoveryService(NActors::TActorSystem* system,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id);
-
- void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
+ public:
+ using TGrpcServiceBase<Ydb::Discovery::V1::DiscoveryService>::TGrpcServiceBase;
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
- NActors::TActorSystem* ActorSystem_;
- grpc::ServerCompletionQueue* CQ_ = nullptr;
-
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
- NActors::TActorId GRpcRequestProxyId_;
- NGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
} // namespace NGRpcService
diff --git a/ydb/services/kesus/grpc_service.cpp b/ydb/services/kesus/grpc_service.cpp
index 7d991c50028..2ee2515df81 100644
--- a/ydb/services/kesus/grpc_service.cpp
+++ b/ydb/services/kesus/grpc_service.cpp
@@ -607,39 +607,9 @@ private:
////////////////////////////////////////////////////////////////////////////////
-TKesusGRpcService::TKesusGRpcService(
- NActors::TActorSystem* actorSystem,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id)
- : ActorSystem(actorSystem)
- , Counters(counters)
- , GRpcRequestProxyId(id)
-{}
-
-TKesusGRpcService::~TKesusGRpcService() {
- // empty
-}
-
-void TKesusGRpcService::InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) {
- CQ = cq;
- SetupIncomingRequests(std::move(logger));
-}
-
-void TKesusGRpcService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
- Limiter = limiter;
-}
-
-bool TKesusGRpcService::IncRequest() {
- return Limiter->Inc();
-}
-
-void TKesusGRpcService::DecRequest() {
- Limiter->Dec();
- Y_ASSERT(Limiter->GetCurrentInFlight() >= 0);
-}
-
void TKesusGRpcService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
- auto getCounterBlock = NGRpcService::CreateCounterCb(Counters, ActorSystem);
+ auto getCounterBlock = NGRpcService::CreateCounterCb(Counters_, ActorSystem_);
+ using NGRpcService::TRateLimiterMode;
#ifdef ADD_REQUEST
#error ADD_REQUEST macro is already defined
@@ -649,12 +619,12 @@ void TKesusGRpcService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
MakeIntrusive<NGRpcService::TGRpcRequest<Ydb::Coordination::IN, Ydb::Coordination::OUT, TKesusGRpcService>>( \
this, \
&Service_, \
- CQ, \
+ CQ_, \
[this](NGrpc::IRequestContextBase* reqCtx) { \
- NGRpcService::ReportGrpcReqToMon(*ActorSystem, reqCtx->GetPeer()); \
- ActorSystem->Send(GRpcRequestProxyId, \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, reqCtx->GetPeer()); \
+ ActorSystem_->Send(GRpcRequestProxyId_, \
new NGRpcService::TGrpcRequestOperationCall<Ydb::Coordination::IN, Ydb::Coordination::OUT> \
- (reqCtx, &CB, NGRpcService::TRequestAuxSettings{NGRpcService::TRateLimiterMode::Rps, nullptr})); \
+ (reqCtx, &CB, NGRpcService::TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); \
}, \
&Ydb::Coordination::V1::CoordinationService::AsyncService::Request ## NAME, \
"Coordination/" #NAME, \
@@ -671,13 +641,13 @@ void TKesusGRpcService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
TGRpcSessionActor::TGRpcRequest::Start(
this,
this->GetService(),
- CQ,
+ CQ_,
&Ydb::Coordination::V1::CoordinationService::AsyncService::RequestSession,
[this](TIntrusivePtr<TGRpcSessionActor::IContext> context) {
- NGRpcService::ReportGrpcReqToMon(*ActorSystem, context->GetPeerName());
- ActorSystem->Send(GRpcRequestProxyId, new NGRpcService::TEvCoordinationSessionRequest(context));
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, context->GetPeerName());
+ ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvCoordinationSessionRequest(context));
},
- *ActorSystem,
+ *ActorSystem_,
"Coordination/Session",
getCounterBlock("coordination", "Session", true, true),
/* TODO: limiter */ nullptr);
diff --git a/ydb/services/kesus/grpc_service.h b/ydb/services/kesus/grpc_service.h
index 6317fdf1dad..ab7ae0645eb 100644
--- a/ydb/services/kesus/grpc_service.h
+++ b/ydb/services/kesus/grpc_service.h
@@ -8,38 +8,23 @@
#include <util/generic/hash_set.h>
+#include <ydb/core/grpc_services/base/base_service.h>
+
+
namespace NKikimr {
namespace NKesus {
class TKesusGRpcService
- : public NGrpc::TGrpcServiceBase<Ydb::Coordination::V1::CoordinationService>
+ : public ::NKikimr::NGRpcService::TGrpcServiceBase<Ydb::Coordination::V1::CoordinationService>
{
class TContextBase;
class TSessionContext;
public:
- TKesusGRpcService(
- NActors::TActorSystem* actorSystem,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id);
- ~TKesusGRpcService();
-
- void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
+ using ::NKikimr::NGRpcService::TGrpcServiceBase<Ydb::Coordination::V1::CoordinationService>::TGrpcServiceBase;
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
-
-private:
- NActors::TActorSystem* ActorSystem;
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
- NActors::TActorId GRpcRequestProxyId;
-
- grpc::ServerCompletionQueue* CQ = nullptr;
- NGrpc::TGlobalLimiter* Limiter = nullptr;
};
}
diff --git a/ydb/services/local_discovery/grpc_service.h b/ydb/services/local_discovery/grpc_service.h
index 02b5232174a..f58e81811c5 100644
--- a/ydb/services/local_discovery/grpc_service.h
+++ b/ydb/services/local_discovery/grpc_service.h
@@ -6,6 +6,7 @@
#include <ydb/public/api/grpc/ydb_discovery_v1.grpc.pb.h>
#include <library/cpp/grpc/server/grpc_server.h>
+#include <ydb/core/grpc_services/base/base_service.h>
namespace NKikimr {
namespace NGRpcService {
diff --git a/ydb/services/monitoring/grpc_service.cpp b/ydb/services/monitoring/grpc_service.cpp
index 6106b26087a..cd25b10ae3b 100644
--- a/ydb/services/monitoring/grpc_service.cpp
+++ b/ydb/services/monitoring/grpc_service.cpp
@@ -17,33 +17,6 @@ static TString GetSdkBuildInfo(NGrpc::IRequestContextBase* reqCtx) {
return TString{res[0]};
}
-TGRpcMonitoringService::TGRpcMonitoringService(NActors::TActorSystem* system,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id)
- : ActorSystem_(system)
- , Counters_(counters)
- , GRpcRequestProxyId_(id)
-{
-}
-
-void TGRpcMonitoringService::InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) {
- CQ_ = cq;
- SetupIncomingRequests(std::move(logger));
-}
-
-void TGRpcMonitoringService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcMonitoringService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcMonitoringService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
void TGRpcMonitoringService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
using namespace Ydb;
diff --git a/ydb/services/monitoring/grpc_service.h b/ydb/services/monitoring/grpc_service.h
index f9d2c7e6a52..93dfce7a1bb 100644
--- a/ydb/services/monitoring/grpc_service.h
+++ b/ydb/services/monitoring/grpc_service.h
@@ -5,32 +5,17 @@
#include <ydb/public/api/grpc/ydb_monitoring_v1.grpc.pb.h>
#include <library/cpp/grpc/server/grpc_server.h>
+#include <ydb/core/grpc_services/base/base_service.h>
namespace NKikimr {
namespace NGRpcService {
- class TGRpcMonitoringService : public NGrpc::TGrpcServiceBase<Ydb::Monitoring::V1::MonitoringService>
+ class TGRpcMonitoringService : public TGrpcServiceBase<Ydb::Monitoring::V1::MonitoringService>
{
public:
- TGRpcMonitoringService(NActors::TActorSystem* system,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id);
-
- void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
-
- private:
+ using TGrpcServiceBase<Ydb::Monitoring::V1::MonitoringService>::TGrpcServiceBase;
+ private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
-
- NActors::TActorSystem* ActorSystem_;
- grpc::ServerCompletionQueue* CQ_ = nullptr;
-
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
- NActors::TActorId GRpcRequestProxyId_;
- NGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
} // namespace NGRpcService
diff --git a/ydb/services/persqueue_v1/persqueue.cpp b/ydb/services/persqueue_v1/persqueue.cpp
index 74ea2f42d4f..9a319d9d785 100644
--- a/ydb/services/persqueue_v1/persqueue.cpp
+++ b/ydb/services/persqueue_v1/persqueue.cpp
@@ -17,58 +17,44 @@ namespace V1 {
static const ui32 PersQueueWriteSessionsMaxCount = 1000000;
static const ui32 PersQueueReadSessionsMaxCount = 100000;
-TGRpcPersQueueService::TGRpcPersQueueService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache,const NActors::TActorId& grpcRequestProxy)
- : ActorSystem(system)
- , Counters(counters)
+TGRpcPersQueueService::TGRpcPersQueueService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache,const NActors::TActorId& grpcRequestProxy, bool rlAllowed)
+ : TGrpcServiceBase<Ydb::PersQueue::V1::PersQueueService>(system, counters, grpcRequestProxy, rlAllowed)
, SchemeCache(schemeCache)
- , GRpcRequestProxy(grpcRequestProxy)
{ }
void TGRpcPersQueueService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
- CQ = cq;
+ CQ_ = cq;
InitNewSchemeCacheActor();
- if (ActorSystem->AppData<TAppData>()->PQConfig.GetEnabled()) {
+ if (ActorSystem_->AppData<TAppData>()->PQConfig.GetEnabled()) {
- IActor* writeSvc = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, Counters, PersQueueWriteSessionsMaxCount);
- TActorId actorId = ActorSystem->Register(writeSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
- ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQWriteServiceActorID(), actorId);
+ IActor* writeSvc = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, Counters_, PersQueueWriteSessionsMaxCount);
+ TActorId actorId = ActorSystem_->Register(writeSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId);
+ ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQWriteServiceActorID(), actorId);
- IActor* readSvc = NGRpcProxy::V1::CreatePQReadService(SchemeCache, NewSchemeCache, Counters, PersQueueReadSessionsMaxCount);
- actorId = ActorSystem->Register(readSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
- ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQReadServiceActorID(), actorId);
+ IActor* readSvc = NGRpcProxy::V1::CreatePQReadService(SchemeCache, NewSchemeCache, Counters_, PersQueueReadSessionsMaxCount);
+ actorId = ActorSystem_->Register(readSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId);
+ ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQReadServiceActorID(), actorId);
- IActor* schemaSvc = NGRpcProxy::V1::CreatePQSchemaService(SchemeCache, Counters);
- actorId = ActorSystem->Register(schemaSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
- ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQSchemaServiceActorID(), actorId);
+ IActor* schemaSvc = NGRpcProxy::V1::CreatePQSchemaService(SchemeCache, Counters_);
+ actorId = ActorSystem_->Register(schemaSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId);
+ ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQSchemaServiceActorID(), actorId);
SetupIncomingRequests(std::move(logger));
}
}
-void TGRpcPersQueueService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
- Limiter = limiter;
-}
-
-bool TGRpcPersQueueService::IncRequest() {
- return Limiter->Inc();
-}
-
-void TGRpcPersQueueService::DecRequest() {
- Limiter->Dec();
-}
-
void TGRpcPersQueueService::InitNewSchemeCacheActor() {
- auto appData = ActorSystem->AppData<TAppData>();
- auto cacheCounters = GetServiceCounters(Counters, "pqproxy|schemecache");
+ auto appData = ActorSystem_->AppData<TAppData>();
+ auto cacheCounters = GetServiceCounters(Counters_, "pqproxy|schemecache");
auto cacheConfig = MakeIntrusive<NSchemeCache::TSchemeCacheConfig>(appData, cacheCounters);
- NewSchemeCache = ActorSystem->Register(CreateSchemeBoardSchemeCache(cacheConfig.Get()),
- TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
+ NewSchemeCache = ActorSystem_->Register(CreateSchemeBoardSchemeCache(cacheConfig.Get()),
+ TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId);
}
void TGRpcPersQueueService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
- auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters, ActorSystem);
+ auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters_, ActorSystem_);
{
using TBiRequest = Ydb::PersQueue::V1::StreamingWriteClientMessage;
@@ -82,11 +68,11 @@ void TGRpcPersQueueService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
NKikimrServices::GRPC_SERVER>;
- TStreamGRpcRequest::Start(this, this->GetService(), CQ, &Ydb::PersQueue::V1::PersQueueService::AsyncService::RequestStreamingWrite,
+ TStreamGRpcRequest::Start(this, this->GetService(), CQ_, &Ydb::PersQueue::V1::PersQueueService::AsyncService::RequestStreamingWrite,
[this](TIntrusivePtr<TStreamGRpcRequest::IContext> context) {
- ActorSystem->Send(GRpcRequestProxy, new NKikimr::NGRpcService::TEvStreamPQWriteRequest(context));
+ ActorSystem_->Send(GRpcRequestProxyId_, new NKikimr::NGRpcService::TEvStreamPQWriteRequest(context));
},
- *ActorSystem, "PersQueueService/CreateWriteSession", getCounterBlock("persistent_queue", "WriteSession", true, true), nullptr
+ *ActorSystem_, "PersQueueService/CreateWriteSession", getCounterBlock("persistent_queue", "WriteSession", true, true), nullptr
);
}
@@ -102,11 +88,11 @@ void TGRpcPersQueueService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
NKikimrServices::GRPC_SERVER>;
- TStreamGRpcRequest::Start(this, this->GetService(), CQ, &Ydb::PersQueue::V1::PersQueueService::AsyncService::RequestMigrationStreamingRead,
+ TStreamGRpcRequest::Start(this, this->GetService(), CQ_, &Ydb::PersQueue::V1::PersQueueService::AsyncService::RequestMigrationStreamingRead,
[this](TIntrusivePtr<TStreamGRpcRequest::IContext> context) {
- ActorSystem->Send(GRpcRequestProxy, new NKikimr::NGRpcService::TEvStreamPQMigrationReadRequest(context));
+ ActorSystem_->Send(GRpcRequestProxyId_, new NKikimr::NGRpcService::TEvStreamPQMigrationReadRequest(context));
},
- *ActorSystem, "PersQueueService/CreateMigrationReadSession", getCounterBlock("persistent_queue", "MigrationReadSession", true, true), nullptr
+ *ActorSystem_, "PersQueueService/CreateMigrationReadSession", getCounterBlock("persistent_queue", "MigrationReadSession", true, true), nullptr
);
}
@@ -114,35 +100,35 @@ void TGRpcPersQueueService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
#error ADD_REQUEST macro already defined
#endif
#define ADD_REQUEST(NAME, SVC, IN, OUT, ACTION) \
- MakeIntrusive<TGRpcRequest<Ydb::PersQueue::V1::IN, Ydb::PersQueue::V1::OUT, NGRpcService::V1::TGRpcPersQueueService>>(this, this->GetService(), CQ, \
+ MakeIntrusive<TGRpcRequest<Ydb::PersQueue::V1::IN, Ydb::PersQueue::V1::OUT, NGRpcService::V1::TGRpcPersQueueService>>(this, this->GetService(), CQ_, \
[this](NGrpc::IRequestContextBase *ctx) { \
- NGRpcService::ReportGrpcReqToMon(*ActorSystem, ctx->GetPeer()); \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
ACTION; \
}, &Ydb::PersQueue::V1::SVC::AsyncService::Request ## NAME, \
"PersQueueService/"#NAME, logger, getCounterBlock("persistent_queue", #NAME))->Run();
ADD_REQUEST(GetReadSessionsInfo, PersQueueService, ReadInfoRequest, ReadInfoResponse, {
- ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQReadInfoRequest(ctx));
+ ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQReadInfoRequest(ctx));
})
ADD_REQUEST(DropTopic, PersQueueService, DropTopicRequest, DropTopicResponse, {
- ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQDropTopicRequest(ctx));
+ ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQDropTopicRequest(ctx));
})
ADD_REQUEST(CreateTopic, PersQueueService, CreateTopicRequest, CreateTopicResponse, {
- ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQCreateTopicRequest(ctx));
+ ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQCreateTopicRequest(ctx));
})
ADD_REQUEST(AlterTopic, PersQueueService, AlterTopicRequest, AlterTopicResponse, {
- ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQAlterTopicRequest(ctx));
+ ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQAlterTopicRequest(ctx));
})
ADD_REQUEST(DescribeTopic, PersQueueService, DescribeTopicRequest, DescribeTopicResponse, {
- ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQDescribeTopicRequest(ctx));
+ ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQDescribeTopicRequest(ctx));
})
ADD_REQUEST(AddReadRule, PersQueueService, AddReadRuleRequest, AddReadRuleResponse, {
- ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQAddReadRuleRequest(ctx));
+ ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQAddReadRuleRequest(ctx));
})
ADD_REQUEST(RemoveReadRule, PersQueueService, RemoveReadRuleRequest, RemoveReadRuleResponse, {
- ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQRemoveReadRuleRequest(ctx));
+ ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQRemoveReadRuleRequest(ctx));
})
#undef ADD_REQUEST
diff --git a/ydb/services/persqueue_v1/persqueue.h b/ydb/services/persqueue_v1/persqueue.h
index 7646d3a6093..cabcaa18f0f 100644
--- a/ydb/services/persqueue_v1/persqueue.h
+++ b/ydb/services/persqueue_v1/persqueue.h
@@ -5,7 +5,7 @@
#include <ydb/public/api/grpc/draft/ydb_persqueue_v1.grpc.pb.h>
#include <library/cpp/grpc/server/grpc_server.h>
-
+#include <ydb/core/grpc_services/base/base_service.h>
namespace NKikimr {
@@ -13,33 +13,24 @@ namespace NGRpcService {
namespace V1 {
class TGRpcPersQueueService
- : public NGrpc::TGrpcServiceBase<Ydb::PersQueue::V1::PersQueueService>
+ : public TGrpcServiceBase<Ydb::PersQueue::V1::PersQueueService>
{
public:
- TGRpcPersQueueService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache, const NActors::TActorId& grpcRequestProxy);
+ TGRpcPersQueueService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache, const NActors::TActorId& grpcRequestProxy, bool rlAllowed);
void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
void StopService() noexcept override;
using NGrpc::TGrpcServiceBase<Ydb::PersQueue::V1::PersQueueService>::GetService;
- bool IncRequest();
- void DecRequest();
private:
- void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
+ void SetupIncomingRequests(NGrpc::TLoggerPtr logger) override;
void InitNewSchemeCacheActor();
- NActors::TActorSystem* ActorSystem;
- grpc::ServerCompletionQueue* CQ = nullptr;
-
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
- NGrpc::TGlobalLimiter* Limiter = nullptr;
NActors::TActorId SchemeCache;
NActors::TActorId NewSchemeCache;
- NActors::TActorId GRpcRequestProxy;
};
} // namespace V1
diff --git a/ydb/services/persqueue_v1/topic.cpp b/ydb/services/persqueue_v1/topic.cpp
index 9d01f2686a6..8dd541760e5 100644
--- a/ydb/services/persqueue_v1/topic.cpp
+++ b/ydb/services/persqueue_v1/topic.cpp
@@ -17,58 +17,44 @@ namespace V1 {
static const ui32 TopicWriteSessionsMaxCount = 1000000;
static const ui32 TopicReadSessionsMaxCount = 100000;
-TGRpcTopicService::TGRpcTopicService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache,const NActors::TActorId& grpcRequestProxy)
- : ActorSystem(system)
- , Counters(counters)
+TGRpcTopicService::TGRpcTopicService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache,const NActors::TActorId& grpcRequestProxy, bool rlAllowed)
+ : TGrpcServiceBase<Ydb::Topic::V1::TopicService>(system, counters, grpcRequestProxy, rlAllowed)
, SchemeCache(schemeCache)
- , GRpcRequestProxy(grpcRequestProxy)
{ }
void TGRpcTopicService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
- CQ = cq;
+ CQ_ = cq;
InitNewSchemeCacheActor();
- if (ActorSystem->AppData<TAppData>()->PQConfig.GetEnabled()) {
+ if (ActorSystem_->AppData<TAppData>()->PQConfig.GetEnabled()) {
- IActor* writeSvc = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, Counters, TopicWriteSessionsMaxCount);
- TActorId actorId = ActorSystem->Register(writeSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
- ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQWriteServiceActorID(), actorId);
+ IActor* writeSvc = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, Counters_, TopicWriteSessionsMaxCount);
+ TActorId actorId = ActorSystem_->Register(writeSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId);
+ ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQWriteServiceActorID(), actorId);
- IActor* readSvc = NGRpcProxy::V1::CreatePQReadService(SchemeCache, NewSchemeCache, Counters, TopicReadSessionsMaxCount);
- actorId = ActorSystem->Register(readSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
- ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQReadServiceActorID(), actorId);
+ IActor* readSvc = NGRpcProxy::V1::CreatePQReadService(SchemeCache, NewSchemeCache, Counters_, TopicReadSessionsMaxCount);
+ actorId = ActorSystem_->Register(readSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId);
+ ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQReadServiceActorID(), actorId);
- IActor* schemaSvc = NGRpcProxy::V1::CreatePQSchemaService(SchemeCache, Counters);
- actorId = ActorSystem->Register(schemaSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
- ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQSchemaServiceActorID(), actorId);
+ IActor* schemaSvc = NGRpcProxy::V1::CreatePQSchemaService(SchemeCache, Counters_);
+ actorId = ActorSystem_->Register(schemaSvc, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId);
+ ActorSystem_->RegisterLocalService(NGRpcProxy::V1::GetPQSchemaServiceActorID(), actorId);
SetupIncomingRequests(std::move(logger));
}
}
-void TGRpcTopicService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
- Limiter = limiter;
-}
-
-bool TGRpcTopicService::IncRequest() {
- return Limiter->Inc();
-}
-
-void TGRpcTopicService::DecRequest() {
- Limiter->Dec();
-}
-
void TGRpcTopicService::InitNewSchemeCacheActor() {
- auto appData = ActorSystem->AppData<TAppData>();
- auto cacheCounters = GetServiceCounters(Counters, "pqproxy|schemecache");
+ auto appData = ActorSystem_->AppData<TAppData>();
+ auto cacheCounters = GetServiceCounters(Counters_, "pqproxy|schemecache");
auto cacheConfig = MakeIntrusive<NSchemeCache::TSchemeCacheConfig>(appData, cacheCounters);
- NewSchemeCache = ActorSystem->Register(CreateSchemeBoardSchemeCache(cacheConfig.Get()),
- TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
+ NewSchemeCache = ActorSystem_->Register(CreateSchemeBoardSchemeCache(cacheConfig.Get()),
+ TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId);
}
void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
- auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters, ActorSystem);
+ auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters_, ActorSystem_);
{
using TBiRequest = Ydb::Topic::StreamWriteMessage::FromClient;
@@ -82,11 +68,11 @@ void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
NKikimrServices::GRPC_SERVER>;
- TStreamGRpcRequest::Start(this, this->GetService(), CQ, &Ydb::Topic::V1::TopicService::AsyncService::RequestStreamWrite,
+ TStreamGRpcRequest::Start(this, this->GetService(), CQ_, &Ydb::Topic::V1::TopicService::AsyncService::RequestStreamWrite,
[this](TIntrusivePtr<TStreamGRpcRequest::IContext> context) {
- ActorSystem->Send(GRpcRequestProxy, new NKikimr::NGRpcService::TEvStreamTopicWriteRequest(context));
+ ActorSystem_->Send(GRpcRequestProxyId_, new NKikimr::NGRpcService::TEvStreamTopicWriteRequest(context));
},
- *ActorSystem, "TopicService/StreamWrite", getCounterBlock("topic", "StreamWrite", true, true), nullptr
+ *ActorSystem_, "TopicService/StreamWrite", getCounterBlock("topic", "StreamWrite", true, true), nullptr
);
}
@@ -102,11 +88,11 @@ void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
NKikimrServices::GRPC_SERVER>;
- TStreamGRpcRequest::Start(this, this->GetService(), CQ, &Ydb::Topic::V1::TopicService::AsyncService::RequestStreamRead,
+ TStreamGRpcRequest::Start(this, this->GetService(), CQ_, &Ydb::Topic::V1::TopicService::AsyncService::RequestStreamRead,
[this](TIntrusivePtr<TStreamGRpcRequest::IContext> context) {
- ActorSystem->Send(GRpcRequestProxy, new NKikimr::NGRpcService::TEvStreamTopicReadRequest(context));
+ ActorSystem_->Send(GRpcRequestProxyId_, new NKikimr::NGRpcService::TEvStreamTopicReadRequest(context));
},
- *ActorSystem, "TopicService/StreamRead", getCounterBlock("topic", "StreamRead", true, true), nullptr
+ *ActorSystem_, "TopicService/StreamRead", getCounterBlock("topic", "StreamRead", true, true), nullptr
);
}
@@ -114,24 +100,24 @@ void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
#error ADD_REQUEST macro already defined
#endif
#define ADD_REQUEST(NAME, SVC, IN, OUT, ACTION) \
- MakeIntrusive<TGRpcRequest<Ydb::Topic::IN, Ydb::Topic::OUT, NGRpcService::V1::TGRpcTopicService>>(this, this->GetService(), CQ, \
+ MakeIntrusive<TGRpcRequest<Ydb::Topic::IN, Ydb::Topic::OUT, NGRpcService::V1::TGRpcTopicService>>(this, this->GetService(), CQ_, \
[this](NGrpc::IRequestContextBase *ctx) { \
- NGRpcService::ReportGrpcReqToMon(*ActorSystem, ctx->GetPeer()); \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
ACTION; \
}, &Ydb::Topic::V1::SVC::AsyncService::Request ## NAME, \
"TopicService/"#NAME, logger, getCounterBlock("topic", #NAME))->Run();
ADD_REQUEST(DropTopic, TopicService, DropTopicRequest, DropTopicResponse, {
- ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvDropTopicRequest(ctx));
+ ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDropTopicRequest(ctx));
})
ADD_REQUEST(CreateTopic, TopicService, CreateTopicRequest, CreateTopicResponse, {
- ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvCreateTopicRequest(ctx));
+ ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvCreateTopicRequest(ctx));
})
ADD_REQUEST(AlterTopic, TopicService, AlterTopicRequest, AlterTopicResponse, {
- ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvAlterTopicRequest(ctx));
+ ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvAlterTopicRequest(ctx));
})
ADD_REQUEST(DescribeTopic, TopicService, DescribeTopicRequest, DescribeTopicResponse, {
- ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvDescribeTopicRequest(ctx));
+ ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDescribeTopicRequest(ctx));
})
#undef ADD_REQUEST
diff --git a/ydb/services/persqueue_v1/topic.h b/ydb/services/persqueue_v1/topic.h
index 9539d63b32f..a75f1b83754 100644
--- a/ydb/services/persqueue_v1/topic.h
+++ b/ydb/services/persqueue_v1/topic.h
@@ -6,6 +6,8 @@
#include <library/cpp/grpc/server/grpc_server.h>
+#include <ydb/core/grpc_services/base/base_service.h>
+
namespace NKikimr {
@@ -13,33 +15,24 @@ namespace NGRpcService {
namespace V1 {
class TGRpcTopicService
- : public NGrpc::TGrpcServiceBase<Ydb::Topic::V1::TopicService>
+ : public TGrpcServiceBase<Ydb::Topic::V1::TopicService>
{
public:
- TGRpcTopicService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache, const NActors::TActorId& grpcRequestProxy);
+ TGRpcTopicService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache, const NActors::TActorId& grpcRequestProxy, bool rlAllowed);
void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
void StopService() noexcept override;
using NGrpc::TGrpcServiceBase<Ydb::Topic::V1::TopicService>::GetService;
- bool IncRequest();
- void DecRequest();
private:
- void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
+ void SetupIncomingRequests(NGrpc::TLoggerPtr logger) override;
void InitNewSchemeCacheActor();
- NActors::TActorSystem* ActorSystem;
- grpc::ServerCompletionQueue* CQ = nullptr;
-
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
- NGrpc::TGlobalLimiter* Limiter = nullptr;
NActors::TActorId SchemeCache;
NActors::TActorId NewSchemeCache;
- NActors::TActorId GRpcRequestProxy;
};
} // namespace V1
diff --git a/ydb/services/ydb/ydb_clickhouse_internal.cpp b/ydb/services/ydb/ydb_clickhouse_internal.cpp
index a2f0208cf72..30f3615ca09 100644
--- a/ydb/services/ydb/ydb_clickhouse_internal.cpp
+++ b/ydb/services/ydb/ydb_clickhouse_internal.cpp
@@ -10,29 +10,11 @@ namespace NGRpcService {
TGRpcYdbClickhouseInternalService::TGRpcYdbClickhouseInternalService(NActors::TActorSystem *system,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
TIntrusivePtr<TInFlightLimiterRegistry> inFlightLimiterRegistry,
- NActors::TActorId id)
- : ActorSystem_(system)
- , Counters_(counters)
+ NActors::TActorId id,
+ bool rlAllowed)
+ : TGrpcServiceBase<Ydb::ClickhouseInternal::V1::ClickhouseInternalService>(system, counters, id, rlAllowed)
, LimiterRegistry_(inFlightLimiterRegistry)
- , GRpcRequestProxyId_(id) {}
-
-void TGRpcYdbClickhouseInternalService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
- CQ_ = cq;
- SetupIncomingRequests(std::move(logger));
-}
-
-void TGRpcYdbClickhouseInternalService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcYdbClickhouseInternalService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcYdbClickhouseInternalService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
+{}
void TGRpcYdbClickhouseInternalService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
@@ -47,7 +29,7 @@ void TGRpcYdbClickhouseInternalService::SetupIncomingRequests(NGrpc::TLoggerPtr
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
ActorSystem_->Send(GRpcRequestProxyId_, \
new NGRpcService::TGrpcRequestOperationCall<Ydb::ClickhouseInternal::IN, Ydb::ClickhouseInternal::OUT> \
- (ctx, &CB, NGRpcService::TRequestAuxSettings{NGRpcService::TRateLimiterMode::Rps, nullptr})); \
+ (ctx, &CB, NGRpcService::TRequestAuxSettings{RLSWITCH(NGRpcService::TRateLimiterMode::Rps), nullptr})); \
}, &Ydb::ClickhouseInternal::V1::ClickhouseInternalService::AsyncService::Request ## NAME, \
#NAME, logger, getCounterBlock("clickhouse_internal", #NAME), getLimiter("ClickhouseInternal", #NAME, DEFAULT_MAX_IN_FLIGHT))->Run();
diff --git a/ydb/services/ydb/ydb_clickhouse_internal.h b/ydb/services/ydb/ydb_clickhouse_internal.h
index 87c889d1823..d57b776bd62 100644
--- a/ydb/services/ydb/ydb_clickhouse_internal.h
+++ b/ydb/services/ydb/ydb_clickhouse_internal.h
@@ -7,34 +7,26 @@
#include <library/cpp/actors/core/actorsystem.h>
+#include <ydb/core/grpc_services/base/base_service.h>
+
+
namespace NKikimr {
namespace NGRpcService {
class TGRpcYdbClickhouseInternalService
- : public NGrpc::TGrpcServiceBase<Ydb::ClickhouseInternal::V1::ClickhouseInternalService>
+ : public TGrpcServiceBase<Ydb::ClickhouseInternal::V1::ClickhouseInternalService>
{
private:
constexpr static i64 DEFAULT_MAX_IN_FLIGHT = 200;
public:
TGRpcYdbClickhouseInternalService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- TIntrusivePtr<NGRpcService::TInFlightLimiterRegistry> inFlightLimiterRegistry, NActors::TActorId id);
+ TIntrusivePtr<NGRpcService::TInFlightLimiterRegistry> inFlightLimiterRegistry, NActors::TActorId id, bool rlAllowed);
- void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
- NActors::TActorSystem* ActorSystem_;
- grpc::ServerCompletionQueue* CQ_ = nullptr;
-
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
TIntrusivePtr<NGRpcService::TInFlightLimiterRegistry> LimiterRegistry_;
- NActors::TActorId GRpcRequestProxyId_;
- NGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
} // namespace NGRpcService
diff --git a/ydb/services/ydb/ydb_experimental.cpp b/ydb/services/ydb/ydb_experimental.cpp
index 3b40907b189..c5904aef474 100644
--- a/ydb/services/ydb/ydb_experimental.cpp
+++ b/ydb/services/ydb/ydb_experimental.cpp
@@ -7,30 +7,6 @@
namespace NKikimr {
namespace NGRpcService {
-TGRpcYdbExperimentalService::TGRpcYdbExperimentalService(NActors::TActorSystem *system,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id)
- : ActorSystem_(system)
- , Counters_(counters)
- , GRpcRequestProxyId_(id) {}
-
-void TGRpcYdbExperimentalService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
- CQ_ = cq;
- SetupIncomingRequests(std::move(logger));
-}
-
-void TGRpcYdbExperimentalService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcYdbExperimentalService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcYdbExperimentalService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
void TGRpcYdbExperimentalService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
diff --git a/ydb/services/ydb/ydb_experimental.h b/ydb/services/ydb/ydb_experimental.h
index 206e0f9c85f..e13f11a7906 100644
--- a/ydb/services/ydb/ydb_experimental.h
+++ b/ydb/services/ydb/ydb_experimental.h
@@ -5,31 +5,19 @@
#include <library/cpp/grpc/server/grpc_server.h>
#include <ydb/public/api/grpc/draft/ydb_experimental_v1.grpc.pb.h>
+#include <ydb/core/grpc_services/base/base_service.h>
namespace NKikimr {
namespace NGRpcService {
class TGRpcYdbExperimentalService
- : public NGrpc::TGrpcServiceBase<Ydb::Experimental::V1::ExperimentalService>
+ : public TGrpcServiceBase<Ydb::Experimental::V1::ExperimentalService>
{
public:
- TGRpcYdbExperimentalService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id);
+ using TGrpcServiceBase<Ydb::Experimental::V1::ExperimentalService>::TGrpcServiceBase;
- void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
-
- NActors::TActorSystem* ActorSystem_;
- grpc::ServerCompletionQueue* CQ_ = nullptr;
-
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
- NActors::TActorId GRpcRequestProxyId_;
- NGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
} // namespace NGRpcService
diff --git a/ydb/services/ydb/ydb_export.cpp b/ydb/services/ydb/ydb_export.cpp
index cd56a41885d..5b279782db2 100644
--- a/ydb/services/ydb/ydb_export.cpp
+++ b/ydb/services/ydb/ydb_export.cpp
@@ -7,30 +7,6 @@
namespace NKikimr {
namespace NGRpcService {
-TGRpcYdbExportService::TGRpcYdbExportService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id)
- : ActorSystem_(system)
- , Counters_(counters)
- , GRpcRequestProxyId_(id)
-{ }
-
-void TGRpcYdbExportService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
- CQ_ = cq;
- SetupIncomingRequests(std::move(logger));
-}
-
-void TGRpcYdbExportService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcYdbExportService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcYdbExportService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
void TGRpcYdbExportService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
diff --git a/ydb/services/ydb/ydb_export.h b/ydb/services/ydb/ydb_export.h
index 1e16f4d8258..44258ec94d8 100644
--- a/ydb/services/ydb/ydb_export.h
+++ b/ydb/services/ydb/ydb_export.h
@@ -3,30 +3,19 @@
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/grpc/server/grpc_server.h>
#include <ydb/public/api/grpc/ydb_export_v1.grpc.pb.h>
+#include <ydb/core/grpc_services/base/base_service.h>
namespace NKikimr {
namespace NGRpcService {
class TGRpcYdbExportService
- : public NGrpc::TGrpcServiceBase<Ydb::Export::V1::ExportService>
+ : public TGrpcServiceBase<Ydb::Export::V1::ExportService>
{
public:
- TGRpcYdbExportService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id);
-
- void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
+ using TGrpcServiceBase<Ydb::Export::V1::ExportService>::TGrpcServiceBase;
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
- NActors::TActorSystem* ActorSystem_;
- grpc::ServerCompletionQueue* CQ_ = nullptr;
-
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
- NActors::TActorId GRpcRequestProxyId_;
- NGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
} // namespace NGRpcService
diff --git a/ydb/services/ydb/ydb_import.cpp b/ydb/services/ydb/ydb_import.cpp
index 029fcbba4a4..300747d7cc0 100644
--- a/ydb/services/ydb/ydb_import.cpp
+++ b/ydb/services/ydb/ydb_import.cpp
@@ -7,30 +7,6 @@
namespace NKikimr {
namespace NGRpcService {
-TGRpcYdbImportService::TGRpcYdbImportService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id)
- : ActorSystem_(system)
- , Counters_(counters)
- , GRpcRequestProxyId_(id)
-{ }
-
-void TGRpcYdbImportService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
- CQ_ = cq;
- SetupIncomingRequests(std::move(logger));
-}
-
-void TGRpcYdbImportService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcYdbImportService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcYdbImportService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
void TGRpcYdbImportService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
diff --git a/ydb/services/ydb/ydb_import.h b/ydb/services/ydb/ydb_import.h
index 13e74d59036..bc05afffcea 100644
--- a/ydb/services/ydb/ydb_import.h
+++ b/ydb/services/ydb/ydb_import.h
@@ -3,30 +3,20 @@
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/grpc/server/grpc_server.h>
#include <ydb/public/api/grpc/ydb_import_v1.grpc.pb.h>
+#include <ydb/core/grpc_services/base/base_service.h>
+
namespace NKikimr {
namespace NGRpcService {
class TGRpcYdbImportService
- : public NGrpc::TGrpcServiceBase<Ydb::Import::V1::ImportService>
+ : public TGrpcServiceBase<Ydb::Import::V1::ImportService>
{
public:
- TGRpcYdbImportService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id);
-
- void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
+ using TGrpcServiceBase<Ydb::Import::V1::ImportService>::TGrpcServiceBase;
- bool IncRequest();
- void DecRequest();
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
-
- NActors::TActorSystem* ActorSystem_;
- grpc::ServerCompletionQueue* CQ_ = nullptr;
-
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
- NActors::TActorId GRpcRequestProxyId_;
- NGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
} // namespace NGRpcService
diff --git a/ydb/services/ydb/ydb_logstore.cpp b/ydb/services/ydb/ydb_logstore.cpp
index 0e151a9ac46..9b07a408fc3 100644
--- a/ydb/services/ydb/ydb_logstore.cpp
+++ b/ydb/services/ydb/ydb_logstore.cpp
@@ -7,31 +7,6 @@
namespace NKikimr::NGRpcService {
-TGRpcYdbLogStoreService::TGRpcYdbLogStoreService(NActors::TActorSystem *system,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id)
- : ActorSystem_(system)
- , Counters_(counters)
- , GRpcRequestProxyId_(id) {}
-
-void TGRpcYdbLogStoreService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
- CQ_ = cq;
- SetupIncomingRequests(std::move(logger));
-}
-
-void TGRpcYdbLogStoreService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcYdbLogStoreService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcYdbLogStoreService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
void TGRpcYdbLogStoreService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
using namespace Ydb;
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
diff --git a/ydb/services/ydb/ydb_logstore.h b/ydb/services/ydb/ydb_logstore.h
index 289bd296bfe..460c9683a0f 100644
--- a/ydb/services/ydb/ydb_logstore.h
+++ b/ydb/services/ydb/ydb_logstore.h
@@ -5,29 +5,17 @@
#include <library/cpp/grpc/server/grpc_server.h>
#include <library/cpp/actors/core/actorsystem.h>
+#include <ydb/core/grpc_services/base/base_service.h>
+
namespace NKikimr::NGRpcService {
class TGRpcYdbLogStoreService
- : public NGrpc::TGrpcServiceBase<Ydb::LogStore::V1::LogStoreService>
+ : public TGrpcServiceBase<Ydb::LogStore::V1::LogStoreService>
{
public:
- TGRpcYdbLogStoreService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id);
-
- void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
+ using TGrpcServiceBase<Ydb::LogStore::V1::LogStoreService>::TGrpcServiceBase;
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
-
- NActors::TActorSystem* ActorSystem_;
- grpc::ServerCompletionQueue* CQ_ = nullptr;
-
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
- NActors::TActorId GRpcRequestProxyId_;
- NGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
}
diff --git a/ydb/services/ydb/ydb_long_tx.cpp b/ydb/services/ydb/ydb_long_tx.cpp
index 4c7aadcedaf..9f7d6fe355e 100644
--- a/ydb/services/ydb/ydb_long_tx.cpp
+++ b/ydb/services/ydb/ydb_long_tx.cpp
@@ -7,33 +7,6 @@
namespace NKikimr {
namespace NGRpcService {
-
-TGRpcYdbLongTxService::TGRpcYdbLongTxService(NActors::TActorSystem* system,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id)
- : ActorSystem_(system)
- , Counters_(counters)
- , GRpcRequestProxyId_(id)
-{}
-
-void TGRpcYdbLongTxService::InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) {
- CQ_ = cq;
- SetupIncomingRequests(std::move(logger));
-}
-
-void TGRpcYdbLongTxService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcYdbLongTxService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcYdbLongTxService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
void TGRpcYdbLongTxService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
diff --git a/ydb/services/ydb/ydb_long_tx.h b/ydb/services/ydb/ydb_long_tx.h
index 53d50b3c357..75ffe47800e 100644
--- a/ydb/services/ydb/ydb_long_tx.h
+++ b/ydb/services/ydb/ydb_long_tx.h
@@ -3,31 +3,18 @@
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/grpc/server/grpc_server.h>
#include <ydb/public/api/grpc/draft/ydb_long_tx_v1.grpc.pb.h>
+#include <ydb/core/grpc_services/base/base_service.h>
namespace NKikimr {
namespace NGRpcService {
class TGRpcYdbLongTxService
- : public NGrpc::TGrpcServiceBase<Ydb::LongTx::V1::LongTxService>
+ : public TGrpcServiceBase<Ydb::LongTx::V1::LongTxService>
{
public:
- TGRpcYdbLongTxService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id);
-
- void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
+ using TGrpcServiceBase<Ydb::LongTx::V1::LongTxService>::TGrpcServiceBase;
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
-
- NActors::TActorSystem* ActorSystem_;
- grpc::ServerCompletionQueue* CQ_ = nullptr;
-
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
- NActors::TActorId GRpcRequestProxyId_;
- NGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
} // namespace NGRpcService
diff --git a/ydb/services/ydb/ydb_operation.cpp b/ydb/services/ydb/ydb_operation.cpp
index c611f13ac15..f202a0b8c8c 100644
--- a/ydb/services/ydb/ydb_operation.cpp
+++ b/ydb/services/ydb/ydb_operation.cpp
@@ -8,30 +8,6 @@
namespace NKikimr {
namespace NGRpcService {
-TGRpcOperationService::TGRpcOperationService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id)
- : ActorSystem_(system)
- , Counters_(counters)
- , GRpcRequestProxyId_(id)
-{ }
-
-void TGRpcOperationService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
- CQ_ = cq;
- SetupIncomingRequests(std::move(logger));
-}
-
-void TGRpcOperationService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter *limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcOperationService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcOperationService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
void TGRpcOperationService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
using namespace Ydb;
@@ -46,7 +22,7 @@ void TGRpcOperationService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
ActorSystem_->Send(GRpcRequestProxyId_, \
new TCALL<Operations::NAME##Request, Operations::NAME##Response> \
- (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::Rps, nullptr})); \
+ (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); \
}, &Operation::V1::OperationService::AsyncService::Request ## NAME, \
#NAME, logger, getCounterBlock("operation", #NAME))->Run();
diff --git a/ydb/services/ydb/ydb_operation.h b/ydb/services/ydb/ydb_operation.h
index fc2ea409eb5..2e3959997a5 100644
--- a/ydb/services/ydb/ydb_operation.h
+++ b/ydb/services/ydb/ydb_operation.h
@@ -3,30 +3,18 @@
#include <library/cpp/actors/core/actorsystem.h>
#include <ydb/public/api/grpc/ydb_operation_v1.grpc.pb.h>
#include <library/cpp/grpc/server/grpc_server.h>
+#include <ydb/core/grpc_services/base/base_service.h>
namespace NKikimr {
namespace NGRpcService {
class TGRpcOperationService
- : public NGrpc::TGrpcServiceBase<Ydb::Operation::V1::OperationService>
+ : public TGrpcServiceBase<Ydb::Operation::V1::OperationService>
{
public:
- TGRpcOperationService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id);
-
- void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
+ using TGrpcServiceBase<Ydb::Operation::V1::OperationService>::TGrpcServiceBase;
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
-
- NActors::TActorSystem* ActorSystem_;
- grpc::ServerCompletionQueue* CQ_ = nullptr;
-
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
- NActors::TActorId GRpcRequestProxyId_;
- NGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
} // namespace NGRpcService
diff --git a/ydb/services/ydb/ydb_s3_internal.cpp b/ydb/services/ydb/ydb_s3_internal.cpp
index 326097b8724..3dcada013e6 100644
--- a/ydb/services/ydb/ydb_s3_internal.cpp
+++ b/ydb/services/ydb/ydb_s3_internal.cpp
@@ -7,30 +7,6 @@
namespace NKikimr {
namespace NGRpcService {
-TGRpcYdbS3InternalService::TGRpcYdbS3InternalService(NActors::TActorSystem *system,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id)
- : ActorSystem_(system)
- , Counters_(counters)
- , GRpcRequestProxyId_(id) {}
-
-void TGRpcYdbS3InternalService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
- CQ_ = cq;
- SetupIncomingRequests(std::move(logger));
-}
-
-void TGRpcYdbS3InternalService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcYdbS3InternalService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcYdbS3InternalService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
void TGRpcYdbS3InternalService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
diff --git a/ydb/services/ydb/ydb_s3_internal.h b/ydb/services/ydb/ydb_s3_internal.h
index 58c2559eb73..9306ed09f1f 100644
--- a/ydb/services/ydb/ydb_s3_internal.h
+++ b/ydb/services/ydb/ydb_s3_internal.h
@@ -3,31 +3,19 @@
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/grpc/server/grpc_server.h>
#include <ydb/public/api/grpc/draft/ydb_s3_internal_v1.grpc.pb.h>
+#include <ydb/core/grpc_services/base/base_service.h>
namespace NKikimr {
namespace NGRpcService {
class TGRpcYdbS3InternalService
- : public NGrpc::TGrpcServiceBase<Ydb::S3Internal::V1::S3InternalService>
+ : public TGrpcServiceBase<Ydb::S3Internal::V1::S3InternalService>
{
public:
- TGRpcYdbS3InternalService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id);
+ using TGrpcServiceBase<Ydb::S3Internal::V1::S3InternalService>::TGrpcServiceBase;
- void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
-
- NActors::TActorSystem* ActorSystem_;
- grpc::ServerCompletionQueue* CQ_ = nullptr;
-
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
- NActors::TActorId GRpcRequestProxyId_;
- NGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
} // namespace NGRpcService
diff --git a/ydb/services/ydb/ydb_scheme.cpp b/ydb/services/ydb/ydb_scheme.cpp
index 1c9195ee853..0bc6bc45b22 100644
--- a/ydb/services/ydb/ydb_scheme.cpp
+++ b/ydb/services/ydb/ydb_scheme.cpp
@@ -8,30 +8,6 @@
namespace NKikimr {
namespace NGRpcService {
-TGRpcYdbSchemeService::TGRpcYdbSchemeService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id)
- : ActorSystem_(system)
- , Counters_(counters)
- , GRpcRequestProxyId_(id)
-{ }
-
-void TGRpcYdbSchemeService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
- CQ_ = cq;
- SetupIncomingRequests(std::move(logger));
-}
-
-void TGRpcYdbSchemeService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcYdbSchemeService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcYdbSchemeService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
void TGRpcYdbSchemeService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
@@ -45,7 +21,7 @@ void TGRpcYdbSchemeService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
ActorSystem_->Send(GRpcRequestProxyId_, \
new TGrpcRequestOperationCall<Ydb::Scheme::NAME##Request, Ydb::Scheme::NAME##Response> \
- (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::Rps, nullptr})); \
+ (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); \
}, &Ydb::Scheme::V1::SchemeService::AsyncService::Request ## NAME, \
#NAME, logger, getCounterBlock("scheme", #NAME))->Run();
diff --git a/ydb/services/ydb/ydb_scheme.h b/ydb/services/ydb/ydb_scheme.h
index f35823e365a..9356c35e81d 100644
--- a/ydb/services/ydb/ydb_scheme.h
+++ b/ydb/services/ydb/ydb_scheme.h
@@ -3,30 +3,20 @@
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/grpc/server/grpc_server.h>
#include <ydb/public/api/grpc/ydb_scheme_v1.grpc.pb.h>
+#include <ydb/core/grpc_services/base/base_service.h>
namespace NKikimr {
namespace NGRpcService {
class TGRpcYdbSchemeService
- : public NGrpc::TGrpcServiceBase<Ydb::Scheme::V1::SchemeService>
+ : public TGrpcServiceBase<Ydb::Scheme::V1::SchemeService>
{
public:
- TGRpcYdbSchemeService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id);
+ using TGrpcServiceBase<Ydb::Scheme::V1::SchemeService>::TGrpcServiceBase;
- void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
-
- bool IncRequest();
- void DecRequest();
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
- NActors::TActorSystem* ActorSystem_;
- grpc::ServerCompletionQueue* CQ_ = nullptr;
-
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
- NActors::TActorId GRpcRequestProxyId_;
- NGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
} // namespace NGRpcService
diff --git a/ydb/services/ydb/ydb_scripting.cpp b/ydb/services/ydb/ydb_scripting.cpp
index e5164beb9c4..7562d31cc16 100644
--- a/ydb/services/ydb/ydb_scripting.cpp
+++ b/ydb/services/ydb/ydb_scripting.cpp
@@ -8,30 +8,6 @@
namespace NKikimr {
namespace NGRpcService {
-TGRpcYdbScriptingService::TGRpcYdbScriptingService(NActors::TActorSystem *system,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id)
- : ActorSystem_(system)
- , Counters_(counters)
- , GRpcRequestProxyId_(id) {}
-
-void TGRpcYdbScriptingService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
- CQ_ = cq;
- SetupIncomingRequests(std::move(logger));
-}
-
-void TGRpcYdbScriptingService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcYdbScriptingService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcYdbScriptingService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
void TGRpcYdbScriptingService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
using Ydb::Scripting::ExecuteYqlRequest;
using Ydb::Scripting::ExecuteYqlResponse;
@@ -55,19 +31,19 @@ void TGRpcYdbScriptingService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
ADD_REQUEST(ExecuteYql, ExecuteYqlRequest, ExecuteYqlResponse, {
ActorSystem_->Send(GRpcRequestProxyId_,
new TGrpcRequestOperationCall<ExecuteYqlRequest, ExecuteYqlResponse>
- (ctx, &DoExecuteYqlScript, TRequestAuxSettings{TRateLimiterMode::Ru, nullptr}));
+ (ctx, &DoExecuteYqlScript, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Ru), nullptr}));
})
ADD_REQUEST(StreamExecuteYql, ExecuteYqlRequest, ExecuteYqlPartialResponse, {
ActorSystem_->Send(GRpcRequestProxyId_,
new TGrpcRequestNoOperationCall<ExecuteYqlRequest, ExecuteYqlPartialResponse>
- (ctx, &DoStreamExecuteYqlScript, TRequestAuxSettings{TRateLimiterMode::Rps, nullptr}));
+ (ctx, &DoStreamExecuteYqlScript, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr}));
})
ADD_REQUEST(ExplainYql, ExplainYqlRequest, ExplainYqlResponse, {
ActorSystem_->Send(GRpcRequestProxyId_,
new TGrpcRequestOperationCall<ExplainYqlRequest, ExplainYqlResponse>
- (ctx, &DoExplainYqlScript, TRequestAuxSettings{TRateLimiterMode::Rps, nullptr}));
+ (ctx, &DoExplainYqlScript, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr}));
})
#undef ADD_REQUEST
}
diff --git a/ydb/services/ydb/ydb_scripting.h b/ydb/services/ydb/ydb_scripting.h
index c68839abb21..67de75c14c5 100644
--- a/ydb/services/ydb/ydb_scripting.h
+++ b/ydb/services/ydb/ydb_scripting.h
@@ -3,31 +3,20 @@
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/grpc/server/grpc_server.h>
#include <ydb/public/api/grpc/ydb_scripting_v1.grpc.pb.h>
+#include <ydb/core/grpc_services/base/base_service.h>
+
namespace NKikimr {
namespace NGRpcService {
class TGRpcYdbScriptingService
- : public NGrpc::TGrpcServiceBase<Ydb::Scripting::V1::ScriptingService>
+ : public TGrpcServiceBase<Ydb::Scripting::V1::ScriptingService>
{
public:
- TGRpcYdbScriptingService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id);
-
- void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
+ using TGrpcServiceBase<Ydb::Scripting::V1::ScriptingService>::TGrpcServiceBase;
- bool IncRequest();
- void DecRequest();
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
-
- NActors::TActorSystem* ActorSystem_;
- grpc::ServerCompletionQueue* CQ_ = nullptr;
-
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
- NActors::TActorId GRpcRequestProxyId_;
- NGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
} // namespace NGRpcService
diff --git a/ydb/services/ydb/ydb_table.cpp b/ydb/services/ydb/ydb_table.cpp
index e8770c4d351..35d55fa4b9d 100644
--- a/ydb/services/ydb/ydb_table.cpp
+++ b/ydb/services/ydb/ydb_table.cpp
@@ -7,30 +7,6 @@
namespace NKikimr {
namespace NGRpcService {
-TGRpcYdbTableService::TGRpcYdbTableService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id)
- : ActorSystem_(system)
- , Counters_(counters)
- , GRpcRequestProxyId_(id)
-{ }
-
-void TGRpcYdbTableService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
- CQ_ = cq;
- SetupIncomingRequests(std::move(logger));
-}
-
-void TGRpcYdbTableService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
- Limiter_ = limiter;
-}
-
-bool TGRpcYdbTableService::IncRequest() {
- return Limiter_->Inc();
-}
-
-void TGRpcYdbTableService::DecRequest() {
- Limiter_->Dec();
- Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
-}
-
void TGRpcYdbTableService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
#ifdef ADD_REQUEST_LIMIT
@@ -48,7 +24,7 @@ void TGRpcYdbTableService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
ActorSystem_->Send(GRpcRequestProxyId_, \
new TGrpcRequestOperationCall<Ydb::Table::NAME##Request, Ydb::Table::NAME##Response> \
- (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::LIMIT_TYPE, nullptr})); \
+ (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::LIMIT_TYPE), nullptr})); \
}, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \
#NAME, logger, getCounterBlock("table", #NAME))->Run();
@@ -59,7 +35,7 @@ void TGRpcYdbTableService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
ActorSystem_->Send(GRpcRequestProxyId_, \
new TGrpcRequestNoOperationCall<Ydb::Table::IN, Ydb::Table::OUT> \
- (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::LIMIT_TYPE, nullptr})); \
+ (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::LIMIT_TYPE), nullptr})); \
}, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \
#NAME, logger, getCounterBlock("table", #NAME))->Run();
diff --git a/ydb/services/ydb/ydb_table.h b/ydb/services/ydb/ydb_table.h
index 083496f129e..38f9ea77c0b 100644
--- a/ydb/services/ydb/ydb_table.h
+++ b/ydb/services/ydb/ydb_table.h
@@ -3,32 +3,19 @@
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/grpc/server/grpc_server.h>
#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>
-
+#include <ydb/core/grpc_services/base/base_service.h>
namespace NKikimr {
namespace NGRpcService {
class TGRpcYdbTableService
- : public NGrpc::TGrpcServiceBase<Ydb::Table::V1::TableService>
+ : public TGrpcServiceBase<Ydb::Table::V1::TableService>
{
public:
- TGRpcYdbTableService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id);
-
- void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
- void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
+ using TGrpcServiceBase<Ydb::Table::V1::TableService>::TGrpcServiceBase;
- bool IncRequest();
- void DecRequest();
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
-
- NActors::TActorSystem* ActorSystem_;
- grpc::ServerCompletionQueue* CQ_ = nullptr;
-
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
- NActors::TActorId GRpcRequestProxyId_;
- NGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
} // namespace NGRpcService