aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2023-02-07 15:47:09 +0300
committereivanov89 <eivanov89@ydb.tech>2023-02-07 15:47:09 +0300
commit37151df3c62268ba29778745a30e59c0a5f5b8fa (patch)
tree19b27dc5529c8974e3410c6b89f6265fd46b4cf4
parenta194faf0a109ac0b04d0372a449acbd45f00ada2 (diff)
downloadydb-37151df3c62268ba29778745a30e59c0a5f5b8fa.tar.gz
use multiple grpc proxies in table service
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp19
-rw-r--r--ydb/core/driver_lib/run/run.cpp61
-rw-r--r--ydb/core/grpc_services/base/base_service.h31
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.h9
-rw-r--r--ydb/core/protos/config.proto2
-rw-r--r--ydb/core/testlib/test_client.cpp62
-rw-r--r--ydb/services/ydb/ydb_table.cpp30
-rw-r--r--ydb/services/ydb/ydb_table.h9
8 files changed, 144 insertions, 79 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 678c64da0b..4281556e7b 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -1614,14 +1614,17 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se
}
}
- if (!IsServiceInitialized(setup, NGRpcService::CreateGRpcRequestProxyId())) {
- auto grpcReqProxy = Config.HasGRpcConfig() && Config.GetGRpcConfig().GetSkipSchemeCheck()
- ? NGRpcService::CreateGRpcRequestProxySimple(Config)
- : NGRpcService::CreateGRpcRequestProxy(Config);
- setup->LocalServices.push_back(std::pair<TActorId,
- TActorSetupCmd>(NGRpcService::CreateGRpcRequestProxyId(),
- TActorSetupCmd(grpcReqProxy, TMailboxType::ReadAsFilled,
- appData->UserPoolId)));
+ if (!IsServiceInitialized(setup, NGRpcService::CreateGRpcRequestProxyId(0))) {
+ const size_t proxyCount = Config.HasGRpcConfig() ? Config.GetGRpcConfig().GetGRpcProxyCount() : 1UL;
+ for (size_t i = 0; i < proxyCount; ++i) {
+ auto grpcReqProxy = Config.HasGRpcConfig() && Config.GetGRpcConfig().GetSkipSchemeCheck()
+ ? NGRpcService::CreateGRpcRequestProxySimple(Config)
+ : NGRpcService::CreateGRpcRequestProxy(Config);
+ setup->LocalServices.push_back(std::pair<TActorId,
+ TActorSetupCmd>(NGRpcService::CreateGRpcRequestProxyId(i),
+ TActorSetupCmd(grpcReqProxy, TMailboxType::ReadAsFilled,
+ appData->UserPoolId)));
+ }
}
if (!IsServiceInitialized(setup, NKesus::MakeKesusProxyServiceId())) {
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index 09cde779df..c25ff2807c 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -669,7 +669,14 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
}
}
- const auto grpcRequestProxyId = NGRpcService::CreateGRpcRequestProxyId();
+
+ // TODO: for now we pass multiple proxies only to the table service
+ const size_t proxyCount = Max(ui32{1}, grpcConfig.GetGRpcProxyCount());
+ TVector<TActorId> grpcRequestProxies;
+ grpcRequestProxies.reserve(proxyCount);
+ for (size_t i = 0; i < proxyCount; ++i) {
+ grpcRequestProxies.push_back(NGRpcService::CreateGRpcRequestProxyId(i));
+ }
if (hasLegacy) {
// start legacy service
@@ -697,60 +704,60 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
}
if (hasTableService) {
- server.AddService(new NGRpcService::TGRpcYdbTableService(ActorSystem.Get(), Counters, grpcRequestProxyId,
+ server.AddService(new NGRpcService::TGRpcYdbTableService(ActorSystem.Get(), Counters, grpcRequestProxies,
hasTableService.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue()));
}
if (hasClickhouseInternal) {
server.AddService(new NGRpcService::TGRpcYdbClickhouseInternalService(ActorSystem.Get(), Counters,
- AppData->InFlightLimiterRegistry, grpcRequestProxyId, hasClickhouseInternal.IsRlAllowed()));
+ AppData->InFlightLimiterRegistry, grpcRequestProxies[0], hasClickhouseInternal.IsRlAllowed()));
}
if (hasScripting) {
server.AddService(new NGRpcService::TGRpcYdbScriptingService(ActorSystem.Get(), Counters,
- grpcRequestProxyId, hasScripting.IsRlAllowed()));
+ grpcRequestProxies[0], hasScripting.IsRlAllowed()));
}
if (hasLongTx) {
server.AddService(new NGRpcService::TGRpcYdbLongTxService(ActorSystem.Get(), Counters,
- grpcRequestProxyId, hasLongTx.IsRlAllowed()));
+ grpcRequestProxies[0], hasLongTx.IsRlAllowed()));
}
if (hasSchemeService) {
// RPC RL enabled
// We have no way to disable or enable this service explicitly
server.AddService(new NGRpcService::TGRpcYdbSchemeService(ActorSystem.Get(), Counters,
- grpcRequestProxyId, true /*hasSchemeService.IsRlAllowed()*/));
+ grpcRequestProxies[0], true /*hasSchemeService.IsRlAllowed()*/));
}
if (hasOperationService) {
server.AddService(new NGRpcService::TGRpcOperationService(ActorSystem.Get(), Counters,
- grpcRequestProxyId, hasOperationService.IsRlAllowed()));
+ grpcRequestProxies[0], hasOperationService.IsRlAllowed()));
}
if (hasExport) {
server.AddService(new NGRpcService::TGRpcYdbExportService(ActorSystem.Get(), Counters,
- grpcRequestProxyId, hasExport.IsRlAllowed()));
+ grpcRequestProxies[0], hasExport.IsRlAllowed()));
}
if (hasImport) {
server.AddService(new NGRpcService::TGRpcYdbImportService(ActorSystem.Get(), Counters,
- grpcRequestProxyId, hasImport.IsRlAllowed()));
+ grpcRequestProxies[0], hasImport.IsRlAllowed()));
}
if (hasKesus) {
server.AddService(new NKesus::TKesusGRpcService(ActorSystem.Get(), Counters,
- grpcRequestProxyId, hasKesus.IsRlAllowed()));
+ grpcRequestProxies[0], hasKesus.IsRlAllowed()));
}
if (hasPQv1) {
server.AddService(new NGRpcService::V1::TGRpcPersQueueService(ActorSystem.Get(), Counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(),
- grpcRequestProxyId, hasPQv1.IsRlAllowed()));
+ grpcRequestProxies[0], hasPQv1.IsRlAllowed()));
}
if (hasPQv1 || hasTopic) {
server.AddService(new NGRpcService::V1::TGRpcTopicService(ActorSystem.Get(), Counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(),
- grpcRequestProxyId, hasTopic.IsRlAllowed() || hasPQv1.IsRlAllowed()));
+ grpcRequestProxies[0], hasTopic.IsRlAllowed() || hasPQv1.IsRlAllowed()));
}
if (hasPQCD) {
@@ -761,64 +768,64 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
inflightLimit = pqcdConfig.GetRequestInflightLimit();
}
server.AddService(new NGRpcService::TGRpcPQClusterDiscoveryService(
- ActorSystem.Get(), Counters, grpcRequestProxyId, inflightLimit
+ ActorSystem.Get(), Counters, grpcRequestProxies[0], inflightLimit
));
}
if (hasCms) {
server.AddService(new NGRpcService::TGRpcCmsService(ActorSystem.Get(), Counters,
- grpcRequestProxyId, hasCms.IsRlAllowed()));
+ grpcRequestProxies[0], hasCms.IsRlAllowed()));
}
if (hasDiscovery) {
server.AddService(new NGRpcService::TGRpcDiscoveryService(ActorSystem.Get(), Counters,
- grpcRequestProxyId, hasDiscovery.IsRlAllowed()));
+ grpcRequestProxies[0], hasDiscovery.IsRlAllowed()));
}
if (hasLocalDiscovery) {
server.AddService(new NGRpcService::TGRpcLocalDiscoveryService(grpcConfig, ActorSystem.Get(), Counters,
- grpcRequestProxyId));
+ grpcRequestProxies[0]));
}
if (hasRateLimiter) {
- server.AddService(new NQuoter::TRateLimiterGRpcService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ server.AddService(new NQuoter::TRateLimiterGRpcService(ActorSystem.Get(), Counters, grpcRequestProxies[0]));
}
if (hasMonitoring) {
server.AddService(new NGRpcService::TGRpcMonitoringService(ActorSystem.Get(), Counters,
- grpcRequestProxyId, hasMonitoring.IsRlAllowed()));
+ grpcRequestProxies[0], hasMonitoring.IsRlAllowed()));
}
if (hasAuth) {
server.AddService(new NGRpcService::TGRpcAuthService(ActorSystem.Get(), Counters,
- grpcRequestProxyId, hasAuth.IsRlAllowed()));
+ grpcRequestProxies[0], hasAuth.IsRlAllowed()));
}
if (hasDataStreams) {
server.AddService(new NGRpcService::TGRpcDataStreamsService(ActorSystem.Get(), Counters,
- grpcRequestProxyId, hasDataStreams.IsRlAllowed()));
+ grpcRequestProxies[0], hasDataStreams.IsRlAllowed()));
}
if (hasYandexQuery) {
- server.AddService(new NGRpcService::TGRpcYandexQueryService(ActorSystem.Get(), Counters, grpcRequestProxyId));
- server.AddService(new NGRpcService::TGRpcFederatedQueryService(ActorSystem.Get(), Counters, grpcRequestProxyId));
- server.AddService(new NGRpcService::TGRpcFqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ server.AddService(new NGRpcService::TGRpcYandexQueryService(ActorSystem.Get(), Counters, grpcRequestProxies[0]));
+ server.AddService(new NGRpcService::TGRpcFederatedQueryService(ActorSystem.Get(), Counters, grpcRequestProxies[0]));
+ server.AddService(new NGRpcService::TGRpcFqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxies[0]));
} /* REMOVE */ else /* THIS else as well and separate ifs */ if (hasYandexQueryPrivate) {
- server.AddService(new NGRpcService::TGRpcFqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ server.AddService(new NGRpcService::TGRpcFqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxies[0]));
}
if (hasQueryService) {
server.AddService(new NGRpcService::TGRpcYdbQueryService(ActorSystem.Get(), Counters,
- grpcRequestProxyId, hasDataStreams.IsRlAllowed()));
+ grpcRequestProxies[0], hasDataStreams.IsRlAllowed()));
}
if (hasLogStore) {
server.AddService(new NGRpcService::TGRpcYdbLogStoreService(ActorSystem.Get(), Counters,
- grpcRequestProxyId, hasLogStore.IsRlAllowed()));
+ grpcRequestProxies[0], hasLogStore.IsRlAllowed()));
}
if (ModuleFactories) {
- for (const auto& service : ModuleFactories->GrpcServiceFactory.Create(enabled, disabled, ActorSystem.Get(), Counters, grpcRequestProxyId)) {
+ for (const auto& service : ModuleFactories->GrpcServiceFactory.Create(enabled, disabled, ActorSystem.Get(), Counters, grpcRequestProxies[0])) {
server.AddService(service);
}
}
diff --git a/ydb/core/grpc_services/base/base_service.h b/ydb/core/grpc_services/base/base_service.h
index 9b2421e065..d94ab49242 100644
--- a/ydb/core/grpc_services/base/base_service.h
+++ b/ydb/core/grpc_services/base/base_service.h
@@ -27,12 +27,30 @@ class TGrpcServiceBase
, public TGrpcServiceCfg
{
public:
- TGrpcServiceBase(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id, bool rlAllowed)
- : TGrpcServiceCfg(rlAllowed)
- , ActorSystem_(system)
- , Counters_(counters)
- , GRpcRequestProxyId_(id)
-{ }
+ TGrpcServiceBase(NActors::TActorSystem *system,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const NActors::TActorId& proxyId,
+ bool rlAllowed)
+ : TGrpcServiceCfg(rlAllowed)
+ , ActorSystem_(system)
+ , Counters_(counters)
+ , GRpcRequestProxyId_(proxyId)
+ , GRpcProxies_{proxyId}
+ {
+ }
+
+ TGrpcServiceBase(NActors::TActorSystem *system,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const TVector<NActors::TActorId>& proxies,
+ bool rlAllowed)
+ : TGrpcServiceCfg(rlAllowed)
+ , ActorSystem_(system)
+ , Counters_(counters)
+ , GRpcRequestProxyId_(proxies[0])
+ , GRpcProxies_(proxies)
+ {
+ Y_VERIFY(proxies.size());
+ }
void InitService(
const std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>& cqs,
@@ -78,6 +96,7 @@ protected:
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
const NActors::TActorId GRpcRequestProxyId_;
+ const TVector<NActors::TActorId> GRpcProxies_;
NGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h
index fe75617184..fa7218f053 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.h
+++ b/ydb/core/grpc_services/grpc_request_proxy.h
@@ -76,8 +76,13 @@ protected:
TActorId DiscoveryCacheActorID;
};
-inline TActorId CreateGRpcRequestProxyId() {
- const auto actorId = TActorId(0, "GRpcReqProxy");
+inline TActorId CreateGRpcRequestProxyId(int n = 0) {
+ if (n == 0) {
+ const auto actorId = TActorId(0, "GRpcReqProxy");
+ return actorId;
+ }
+
+ const auto actorId = TActorId(0, TStringBuilder() << "GRpcReqPro" << n);
return actorId;
}
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 24d42aa234..3b2d9d33b1 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -656,6 +656,8 @@ message TGRpcConfig {
optional uint32 WorkersPerCompletionQueue = 104 [default = 1];
optional uint32 HandlersPerCompletionQueue = 105 [default = 10];
+ optional uint32 GRpcProxyCount = 106 [default = 2];
+
repeated TGRpcConfig ExtEndpoints = 200; // run specific services on separate endpoints
}
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 6d7b66eadc..ff072904ee 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -266,9 +266,17 @@ namespace Tests {
auto grpcService = new NGRpcProxy::TGRpcService();
auto system(Runtime->GetAnyNodeActorSystem());
- auto grpcRequestProxy = NGRpcService::CreateGRpcRequestProxy(Settings->AppConfig);
- auto grpcRequestProxyId = system->Register(grpcRequestProxy, TMailboxType::ReadAsFilled);
- system->RegisterLocalService(NGRpcService::CreateGRpcRequestProxyId(), grpcRequestProxyId);
+
+ const size_t proxyCount = Max(ui32{1}, Settings->AppConfig.GetGRpcConfig().GetGRpcProxyCount());
+ TVector<TActorId> grpcRequestProxies;
+ grpcRequestProxies.reserve(proxyCount);
+ for (size_t i = 0; i < proxyCount; ++i) {
+ auto grpcRequestProxy = NGRpcService::CreateGRpcRequestProxy(Settings->AppConfig);
+ auto grpcRequestProxyId = system->Register(grpcRequestProxy, TMailboxType::ReadAsFilled);
+ system->RegisterLocalService(NGRpcService::CreateGRpcRequestProxyId(), grpcRequestProxyId);
+ grpcRequestProxies.push_back(grpcRequestProxyId);
+ }
+
auto grpcMon = system->Register(NGRpcService::CreateGrpcMonService(), TMailboxType::ReadAsFilled);
system->RegisterLocalService(NGRpcService::GrpcMonServiceId(), grpcMon);
@@ -321,38 +329,38 @@ namespace Tests {
future.Subscribe(startCb);
GRpcServer->AddService(grpcService);
- GRpcServer->AddService(new NGRpcService::TGRpcYdbExportService(system, counters, grpcRequestProxyId, true));
- GRpcServer->AddService(new NGRpcService::TGRpcYdbImportService(system, counters, grpcRequestProxyId, true));
- GRpcServer->AddService(new NGRpcService::TGRpcYdbSchemeService(system, counters, grpcRequestProxyId, true));
- GRpcServer->AddService(new NGRpcService::TGRpcYdbTableService(system, counters, grpcRequestProxyId, true));
- GRpcServer->AddService(new NGRpcService::TGRpcYdbScriptingService(system, counters, grpcRequestProxyId, true));
- GRpcServer->AddService(new NGRpcService::TGRpcOperationService(system, counters, grpcRequestProxyId, true));
- GRpcServer->AddService(new NGRpcService::V1::TGRpcPersQueueService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId, true));
- GRpcServer->AddService(new NGRpcService::V1::TGRpcTopicService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId, true));
- GRpcServer->AddService(new NGRpcService::TGRpcPQClusterDiscoveryService(system, counters, grpcRequestProxyId));
- GRpcServer->AddService(new NKesus::TKesusGRpcService(system, counters, grpcRequestProxyId, true));
- GRpcServer->AddService(new NGRpcService::TGRpcCmsService(system, counters, grpcRequestProxyId, true));
- GRpcServer->AddService(new NGRpcService::TGRpcDiscoveryService(system, counters, grpcRequestProxyId, true));
- GRpcServer->AddService(new NGRpcService::TGRpcYdbClickhouseInternalService(system, counters, appData.InFlightLimiterRegistry, grpcRequestProxyId, true));
- GRpcServer->AddService(new NQuoter::TRateLimiterGRpcService(system, counters, grpcRequestProxyId));
- GRpcServer->AddService(new NGRpcService::TGRpcYdbLongTxService(system, counters, grpcRequestProxyId, true));
- GRpcServer->AddService(new NGRpcService::TGRpcDataStreamsService(system, counters, grpcRequestProxyId, true));
- GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxyId, true));
- GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxyId, true));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbExportService(system, counters, grpcRequestProxies[0], true));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbImportService(system, counters, grpcRequestProxies[0], true));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbSchemeService(system, counters, grpcRequestProxies[0], true));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbTableService(system, counters, grpcRequestProxies, true, 1));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbScriptingService(system, counters, grpcRequestProxies[0], true));
+ GRpcServer->AddService(new NGRpcService::TGRpcOperationService(system, counters, grpcRequestProxies[0], true));
+ GRpcServer->AddService(new NGRpcService::V1::TGRpcPersQueueService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxies[0], true));
+ GRpcServer->AddService(new NGRpcService::V1::TGRpcTopicService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxies[0], true));
+ GRpcServer->AddService(new NGRpcService::TGRpcPQClusterDiscoveryService(system, counters, grpcRequestProxies[0]));
+ GRpcServer->AddService(new NKesus::TKesusGRpcService(system, counters, grpcRequestProxies[0], true));
+ GRpcServer->AddService(new NGRpcService::TGRpcCmsService(system, counters, grpcRequestProxies[0], true));
+ GRpcServer->AddService(new NGRpcService::TGRpcDiscoveryService(system, counters, grpcRequestProxies[0], true));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbClickhouseInternalService(system, counters, appData.InFlightLimiterRegistry, grpcRequestProxies[0], true));
+ GRpcServer->AddService(new NQuoter::TRateLimiterGRpcService(system, counters, grpcRequestProxies[0]));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbLongTxService(system, counters, grpcRequestProxies[0], true));
+ GRpcServer->AddService(new NGRpcService::TGRpcDataStreamsService(system, counters, grpcRequestProxies[0], true));
+ GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxies[0], true));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxies[0], true));
if (Settings->EnableYq) {
- GRpcServer->AddService(new NGRpcService::TGRpcYandexQueryService(system, counters, grpcRequestProxyId));
- GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxyId));
- GRpcServer->AddService(new NGRpcService::TGRpcFqPrivateTaskService(system, counters, grpcRequestProxyId));
+ GRpcServer->AddService(new NGRpcService::TGRpcYandexQueryService(system, counters, grpcRequestProxies[0]));
+ GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxies[0]));
+ GRpcServer->AddService(new NGRpcService::TGRpcFqPrivateTaskService(system, counters, grpcRequestProxies[0]));
}
if (const auto& factory = Settings->GrpcServiceFactory) {
// All services enabled by default for ut
static const std::unordered_set<TString> dummy;
- for (const auto& service : factory->Create(dummy, dummy, system, counters, grpcRequestProxyId)) {
+ for (const auto& service : factory->Create(dummy, dummy, system, counters, grpcRequestProxies[0])) {
GRpcServer->AddService(service);
}
}
- GRpcServer->AddService(new NGRpcService::TGRpcYdbLogStoreService(system, counters, grpcRequestProxyId, true));
- GRpcServer->AddService(new NGRpcService::TGRpcAuthService(system, counters, grpcRequestProxyId, true));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbLogStoreService(system, counters, grpcRequestProxies[0], true));
+ GRpcServer->AddService(new NGRpcService::TGRpcAuthService(system, counters, grpcRequestProxies[0], true));
GRpcServer->Start();
}
diff --git a/ydb/services/ydb/ydb_table.cpp b/ydb/services/ydb/ydb_table.cpp
index 4f66177165..f0bb08649a 100644
--- a/ydb/services/ydb/ydb_table.cpp
+++ b/ydb/services/ydb/ydb_table.cpp
@@ -9,10 +9,20 @@ namespace NGRpcService {
TGRpcYdbTableService::TGRpcYdbTableService(NActors::TActorSystem *system,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id,
+ const NActors::TActorId& proxyId,
bool rlAllowed,
size_t handlersPerCompletionQueue)
- : TGrpcServiceBase(system, counters, id, rlAllowed)
+ : TGrpcServiceBase(system, counters, proxyId, rlAllowed)
+ , HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue))
+{
+}
+
+TGRpcYdbTableService::TGRpcYdbTableService(NActors::TActorSystem *system,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const TVector<NActors::TActorId>& proxies,
+ bool rlAllowed,
+ size_t handlersPerCompletionQueue)
+ : TGrpcServiceBase(system, counters, proxies, rlAllowed)
, HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue))
{
}
@@ -20,6 +30,8 @@ TGRpcYdbTableService::TGRpcYdbTableService(NActors::TActorSystem *system,
void TGRpcYdbTableService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
+ size_t proxyCounter = 0;
+
#ifdef ADD_REQUEST_LIMIT
#error ADD_REQUEST_LIMIT macro already defined
#endif
@@ -29,32 +41,34 @@ void TGRpcYdbTableService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
#endif
#define ADD_REQUEST_LIMIT(NAME, CB, LIMIT_TYPE) \
- for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \
+ for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \
for (auto* cq: CQS) { \
MakeIntrusive<TGRpcRequest<Ydb::Table::NAME##Request, Ydb::Table::NAME##Response, TGRpcYdbTableService>> \
(this, &Service_, cq, \
- [this](NGrpc::IRequestContextBase *ctx) { \
+ [this, proxyCounter](NGrpc::IRequestContextBase *ctx) { \
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
- ActorSystem_->Send(GRpcRequestProxyId_, \
+ ActorSystem_->Send(GRpcProxies_[proxyCounter % GRpcProxies_.size()], \
new TGrpcRequestOperationCall<Ydb::Table::NAME##Request, Ydb::Table::NAME##Response> \
(ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::LIMIT_TYPE), nullptr})); \
}, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \
#NAME, logger, getCounterBlock("table", #NAME))->Run(); \
+ ++proxyCounter; \
} \
}
#define ADD_STREAM_REQUEST_LIMIT(NAME, IN, OUT, CB, LIMIT_TYPE) \
- for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \
+ for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \
for (auto* cq: CQS) { \
MakeIntrusive<TGRpcRequest<Ydb::Table::IN, Ydb::Table::OUT, TGRpcYdbTableService>> \
(this, &Service_, cq, \
- [this](NGrpc::IRequestContextBase *ctx) { \
+ [this, proxyCounter](NGrpc::IRequestContextBase *ctx) { \
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
- ActorSystem_->Send(GRpcRequestProxyId_, \
+ ActorSystem_->Send(GRpcProxies_[proxyCounter % GRpcProxies_.size()], \
new TGrpcRequestNoOperationCall<Ydb::Table::IN, Ydb::Table::OUT> \
(ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::LIMIT_TYPE), nullptr})); \
}, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \
#NAME, logger, getCounterBlock("table", #NAME))->Run(); \
+ ++proxyCounter; \
} \
}
diff --git a/ydb/services/ydb/ydb_table.h b/ydb/services/ydb/ydb_table.h
index 633ac232d2..da248ea6e4 100644
--- a/ydb/services/ydb/ydb_table.h
+++ b/ydb/services/ydb/ydb_table.h
@@ -17,10 +17,17 @@ public:
TGRpcYdbTableService(
NActors::TActorSystem *system,
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
- NActors::TActorId id,
+ const NActors::TActorId& proxyId,
bool rlAllowed,
size_t handlersPerCompletionQueue = 1);
+ TGRpcYdbTableService(
+ NActors::TActorSystem *system,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const TVector<NActors::TActorId>& proxies,
+ bool rlAllowed,
+ size_t handlersPerCompletionQueue);
+
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);