diff options
author | eivanov89 <eivanov89@ydb.tech> | 2023-02-07 15:47:09 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2023-02-07 15:47:09 +0300 |
commit | 37151df3c62268ba29778745a30e59c0a5f5b8fa (patch) | |
tree | 19b27dc5529c8974e3410c6b89f6265fd46b4cf4 | |
parent | a194faf0a109ac0b04d0372a449acbd45f00ada2 (diff) | |
download | ydb-37151df3c62268ba29778745a30e59c0a5f5b8fa.tar.gz |
use multiple grpc proxies in table service
-rw-r--r-- | ydb/core/driver_lib/run/kikimr_services_initializers.cpp | 19 | ||||
-rw-r--r-- | ydb/core/driver_lib/run/run.cpp | 61 | ||||
-rw-r--r-- | ydb/core/grpc_services/base/base_service.h | 31 | ||||
-rw-r--r-- | ydb/core/grpc_services/grpc_request_proxy.h | 9 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 2 | ||||
-rw-r--r-- | ydb/core/testlib/test_client.cpp | 62 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_table.cpp | 30 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_table.h | 9 |
8 files changed, 144 insertions, 79 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 678c64da0b..4281556e7b 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -1614,14 +1614,17 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se } } - if (!IsServiceInitialized(setup, NGRpcService::CreateGRpcRequestProxyId())) { - auto grpcReqProxy = Config.HasGRpcConfig() && Config.GetGRpcConfig().GetSkipSchemeCheck() - ? NGRpcService::CreateGRpcRequestProxySimple(Config) - : NGRpcService::CreateGRpcRequestProxy(Config); - setup->LocalServices.push_back(std::pair<TActorId, - TActorSetupCmd>(NGRpcService::CreateGRpcRequestProxyId(), - TActorSetupCmd(grpcReqProxy, TMailboxType::ReadAsFilled, - appData->UserPoolId))); + if (!IsServiceInitialized(setup, NGRpcService::CreateGRpcRequestProxyId(0))) { + const size_t proxyCount = Config.HasGRpcConfig() ? Config.GetGRpcConfig().GetGRpcProxyCount() : 1UL; + for (size_t i = 0; i < proxyCount; ++i) { + auto grpcReqProxy = Config.HasGRpcConfig() && Config.GetGRpcConfig().GetSkipSchemeCheck() + ? NGRpcService::CreateGRpcRequestProxySimple(Config) + : NGRpcService::CreateGRpcRequestProxy(Config); + setup->LocalServices.push_back(std::pair<TActorId, + TActorSetupCmd>(NGRpcService::CreateGRpcRequestProxyId(i), + TActorSetupCmd(grpcReqProxy, TMailboxType::ReadAsFilled, + appData->UserPoolId))); + } } if (!IsServiceInitialized(setup, NKesus::MakeKesusProxyServiceId())) { diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 09cde779df..c25ff2807c 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -669,7 +669,14 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { } } - const auto grpcRequestProxyId = NGRpcService::CreateGRpcRequestProxyId(); + + // TODO: for now we pass multiple proxies only to the table service + const size_t proxyCount = Max(ui32{1}, grpcConfig.GetGRpcProxyCount()); + TVector<TActorId> grpcRequestProxies; + grpcRequestProxies.reserve(proxyCount); + for (size_t i = 0; i < proxyCount; ++i) { + grpcRequestProxies.push_back(NGRpcService::CreateGRpcRequestProxyId(i)); + } if (hasLegacy) { // start legacy service @@ -697,60 +704,60 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { } if (hasTableService) { - server.AddService(new NGRpcService::TGRpcYdbTableService(ActorSystem.Get(), Counters, grpcRequestProxyId, + server.AddService(new NGRpcService::TGRpcYdbTableService(ActorSystem.Get(), Counters, grpcRequestProxies, hasTableService.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue())); } if (hasClickhouseInternal) { server.AddService(new NGRpcService::TGRpcYdbClickhouseInternalService(ActorSystem.Get(), Counters, - AppData->InFlightLimiterRegistry, grpcRequestProxyId, hasClickhouseInternal.IsRlAllowed())); + AppData->InFlightLimiterRegistry, grpcRequestProxies[0], hasClickhouseInternal.IsRlAllowed())); } if (hasScripting) { server.AddService(new NGRpcService::TGRpcYdbScriptingService(ActorSystem.Get(), Counters, - grpcRequestProxyId, hasScripting.IsRlAllowed())); + grpcRequestProxies[0], hasScripting.IsRlAllowed())); } if (hasLongTx) { server.AddService(new NGRpcService::TGRpcYdbLongTxService(ActorSystem.Get(), Counters, - grpcRequestProxyId, hasLongTx.IsRlAllowed())); + grpcRequestProxies[0], hasLongTx.IsRlAllowed())); } if (hasSchemeService) { // RPC RL enabled // We have no way to disable or enable this service explicitly server.AddService(new NGRpcService::TGRpcYdbSchemeService(ActorSystem.Get(), Counters, - grpcRequestProxyId, true /*hasSchemeService.IsRlAllowed()*/)); + grpcRequestProxies[0], true /*hasSchemeService.IsRlAllowed()*/)); } if (hasOperationService) { server.AddService(new NGRpcService::TGRpcOperationService(ActorSystem.Get(), Counters, - grpcRequestProxyId, hasOperationService.IsRlAllowed())); + grpcRequestProxies[0], hasOperationService.IsRlAllowed())); } if (hasExport) { server.AddService(new NGRpcService::TGRpcYdbExportService(ActorSystem.Get(), Counters, - grpcRequestProxyId, hasExport.IsRlAllowed())); + grpcRequestProxies[0], hasExport.IsRlAllowed())); } if (hasImport) { server.AddService(new NGRpcService::TGRpcYdbImportService(ActorSystem.Get(), Counters, - grpcRequestProxyId, hasImport.IsRlAllowed())); + grpcRequestProxies[0], hasImport.IsRlAllowed())); } if (hasKesus) { server.AddService(new NKesus::TKesusGRpcService(ActorSystem.Get(), Counters, - grpcRequestProxyId, hasKesus.IsRlAllowed())); + grpcRequestProxies[0], hasKesus.IsRlAllowed())); } if (hasPQv1) { server.AddService(new NGRpcService::V1::TGRpcPersQueueService(ActorSystem.Get(), Counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), - grpcRequestProxyId, hasPQv1.IsRlAllowed())); + grpcRequestProxies[0], hasPQv1.IsRlAllowed())); } if (hasPQv1 || hasTopic) { server.AddService(new NGRpcService::V1::TGRpcTopicService(ActorSystem.Get(), Counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), - grpcRequestProxyId, hasTopic.IsRlAllowed() || hasPQv1.IsRlAllowed())); + grpcRequestProxies[0], hasTopic.IsRlAllowed() || hasPQv1.IsRlAllowed())); } if (hasPQCD) { @@ -761,64 +768,64 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { inflightLimit = pqcdConfig.GetRequestInflightLimit(); } server.AddService(new NGRpcService::TGRpcPQClusterDiscoveryService( - ActorSystem.Get(), Counters, grpcRequestProxyId, inflightLimit + ActorSystem.Get(), Counters, grpcRequestProxies[0], inflightLimit )); } if (hasCms) { server.AddService(new NGRpcService::TGRpcCmsService(ActorSystem.Get(), Counters, - grpcRequestProxyId, hasCms.IsRlAllowed())); + grpcRequestProxies[0], hasCms.IsRlAllowed())); } if (hasDiscovery) { server.AddService(new NGRpcService::TGRpcDiscoveryService(ActorSystem.Get(), Counters, - grpcRequestProxyId, hasDiscovery.IsRlAllowed())); + grpcRequestProxies[0], hasDiscovery.IsRlAllowed())); } if (hasLocalDiscovery) { server.AddService(new NGRpcService::TGRpcLocalDiscoveryService(grpcConfig, ActorSystem.Get(), Counters, - grpcRequestProxyId)); + grpcRequestProxies[0])); } if (hasRateLimiter) { - server.AddService(new NQuoter::TRateLimiterGRpcService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + server.AddService(new NQuoter::TRateLimiterGRpcService(ActorSystem.Get(), Counters, grpcRequestProxies[0])); } if (hasMonitoring) { server.AddService(new NGRpcService::TGRpcMonitoringService(ActorSystem.Get(), Counters, - grpcRequestProxyId, hasMonitoring.IsRlAllowed())); + grpcRequestProxies[0], hasMonitoring.IsRlAllowed())); } if (hasAuth) { server.AddService(new NGRpcService::TGRpcAuthService(ActorSystem.Get(), Counters, - grpcRequestProxyId, hasAuth.IsRlAllowed())); + grpcRequestProxies[0], hasAuth.IsRlAllowed())); } if (hasDataStreams) { server.AddService(new NGRpcService::TGRpcDataStreamsService(ActorSystem.Get(), Counters, - grpcRequestProxyId, hasDataStreams.IsRlAllowed())); + grpcRequestProxies[0], hasDataStreams.IsRlAllowed())); } if (hasYandexQuery) { - server.AddService(new NGRpcService::TGRpcYandexQueryService(ActorSystem.Get(), Counters, grpcRequestProxyId)); - server.AddService(new NGRpcService::TGRpcFederatedQueryService(ActorSystem.Get(), Counters, grpcRequestProxyId)); - server.AddService(new NGRpcService::TGRpcFqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + server.AddService(new NGRpcService::TGRpcYandexQueryService(ActorSystem.Get(), Counters, grpcRequestProxies[0])); + server.AddService(new NGRpcService::TGRpcFederatedQueryService(ActorSystem.Get(), Counters, grpcRequestProxies[0])); + server.AddService(new NGRpcService::TGRpcFqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxies[0])); } /* REMOVE */ else /* THIS else as well and separate ifs */ if (hasYandexQueryPrivate) { - server.AddService(new NGRpcService::TGRpcFqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + server.AddService(new NGRpcService::TGRpcFqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxies[0])); } if (hasQueryService) { server.AddService(new NGRpcService::TGRpcYdbQueryService(ActorSystem.Get(), Counters, - grpcRequestProxyId, hasDataStreams.IsRlAllowed())); + grpcRequestProxies[0], hasDataStreams.IsRlAllowed())); } if (hasLogStore) { server.AddService(new NGRpcService::TGRpcYdbLogStoreService(ActorSystem.Get(), Counters, - grpcRequestProxyId, hasLogStore.IsRlAllowed())); + grpcRequestProxies[0], hasLogStore.IsRlAllowed())); } if (ModuleFactories) { - for (const auto& service : ModuleFactories->GrpcServiceFactory.Create(enabled, disabled, ActorSystem.Get(), Counters, grpcRequestProxyId)) { + for (const auto& service : ModuleFactories->GrpcServiceFactory.Create(enabled, disabled, ActorSystem.Get(), Counters, grpcRequestProxies[0])) { server.AddService(service); } } diff --git a/ydb/core/grpc_services/base/base_service.h b/ydb/core/grpc_services/base/base_service.h index 9b2421e065..d94ab49242 100644 --- a/ydb/core/grpc_services/base/base_service.h +++ b/ydb/core/grpc_services/base/base_service.h @@ -27,12 +27,30 @@ class TGrpcServiceBase , public TGrpcServiceCfg { public: - TGrpcServiceBase(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id, bool rlAllowed) - : TGrpcServiceCfg(rlAllowed) - , ActorSystem_(system) - , Counters_(counters) - , GRpcRequestProxyId_(id) -{ } + TGrpcServiceBase(NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const NActors::TActorId& proxyId, + bool rlAllowed) + : TGrpcServiceCfg(rlAllowed) + , ActorSystem_(system) + , Counters_(counters) + , GRpcRequestProxyId_(proxyId) + , GRpcProxies_{proxyId} + { + } + + TGrpcServiceBase(NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TVector<NActors::TActorId>& proxies, + bool rlAllowed) + : TGrpcServiceCfg(rlAllowed) + , ActorSystem_(system) + , Counters_(counters) + , GRpcRequestProxyId_(proxies[0]) + , GRpcProxies_(proxies) + { + Y_VERIFY(proxies.size()); + } void InitService( const std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>& cqs, @@ -78,6 +96,7 @@ protected: TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; const NActors::TActorId GRpcRequestProxyId_; + const TVector<NActors::TActorId> GRpcProxies_; NGrpc::TGlobalLimiter* Limiter_ = nullptr; }; diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h index fe75617184..fa7218f053 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.h +++ b/ydb/core/grpc_services/grpc_request_proxy.h @@ -76,8 +76,13 @@ protected: TActorId DiscoveryCacheActorID; }; -inline TActorId CreateGRpcRequestProxyId() { - const auto actorId = TActorId(0, "GRpcReqProxy"); +inline TActorId CreateGRpcRequestProxyId(int n = 0) { + if (n == 0) { + const auto actorId = TActorId(0, "GRpcReqProxy"); + return actorId; + } + + const auto actorId = TActorId(0, TStringBuilder() << "GRpcReqPro" << n); return actorId; } diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 24d42aa234..3b2d9d33b1 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -656,6 +656,8 @@ message TGRpcConfig { optional uint32 WorkersPerCompletionQueue = 104 [default = 1]; optional uint32 HandlersPerCompletionQueue = 105 [default = 10]; + optional uint32 GRpcProxyCount = 106 [default = 2]; + repeated TGRpcConfig ExtEndpoints = 200; // run specific services on separate endpoints } diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 6d7b66eadc..ff072904ee 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -266,9 +266,17 @@ namespace Tests { auto grpcService = new NGRpcProxy::TGRpcService(); auto system(Runtime->GetAnyNodeActorSystem()); - auto grpcRequestProxy = NGRpcService::CreateGRpcRequestProxy(Settings->AppConfig); - auto grpcRequestProxyId = system->Register(grpcRequestProxy, TMailboxType::ReadAsFilled); - system->RegisterLocalService(NGRpcService::CreateGRpcRequestProxyId(), grpcRequestProxyId); + + const size_t proxyCount = Max(ui32{1}, Settings->AppConfig.GetGRpcConfig().GetGRpcProxyCount()); + TVector<TActorId> grpcRequestProxies; + grpcRequestProxies.reserve(proxyCount); + for (size_t i = 0; i < proxyCount; ++i) { + auto grpcRequestProxy = NGRpcService::CreateGRpcRequestProxy(Settings->AppConfig); + auto grpcRequestProxyId = system->Register(grpcRequestProxy, TMailboxType::ReadAsFilled); + system->RegisterLocalService(NGRpcService::CreateGRpcRequestProxyId(), grpcRequestProxyId); + grpcRequestProxies.push_back(grpcRequestProxyId); + } + auto grpcMon = system->Register(NGRpcService::CreateGrpcMonService(), TMailboxType::ReadAsFilled); system->RegisterLocalService(NGRpcService::GrpcMonServiceId(), grpcMon); @@ -321,38 +329,38 @@ namespace Tests { future.Subscribe(startCb); GRpcServer->AddService(grpcService); - GRpcServer->AddService(new NGRpcService::TGRpcYdbExportService(system, counters, grpcRequestProxyId, true)); - GRpcServer->AddService(new NGRpcService::TGRpcYdbImportService(system, counters, grpcRequestProxyId, true)); - GRpcServer->AddService(new NGRpcService::TGRpcYdbSchemeService(system, counters, grpcRequestProxyId, true)); - GRpcServer->AddService(new NGRpcService::TGRpcYdbTableService(system, counters, grpcRequestProxyId, true)); - GRpcServer->AddService(new NGRpcService::TGRpcYdbScriptingService(system, counters, grpcRequestProxyId, true)); - GRpcServer->AddService(new NGRpcService::TGRpcOperationService(system, counters, grpcRequestProxyId, true)); - GRpcServer->AddService(new NGRpcService::V1::TGRpcPersQueueService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId, true)); - GRpcServer->AddService(new NGRpcService::V1::TGRpcTopicService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId, true)); - GRpcServer->AddService(new NGRpcService::TGRpcPQClusterDiscoveryService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NKesus::TKesusGRpcService(system, counters, grpcRequestProxyId, true)); - GRpcServer->AddService(new NGRpcService::TGRpcCmsService(system, counters, grpcRequestProxyId, true)); - GRpcServer->AddService(new NGRpcService::TGRpcDiscoveryService(system, counters, grpcRequestProxyId, true)); - GRpcServer->AddService(new NGRpcService::TGRpcYdbClickhouseInternalService(system, counters, appData.InFlightLimiterRegistry, grpcRequestProxyId, true)); - GRpcServer->AddService(new NQuoter::TRateLimiterGRpcService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::TGRpcYdbLongTxService(system, counters, grpcRequestProxyId, true)); - GRpcServer->AddService(new NGRpcService::TGRpcDataStreamsService(system, counters, grpcRequestProxyId, true)); - GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxyId, true)); - GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxyId, true)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbExportService(system, counters, grpcRequestProxies[0], true)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbImportService(system, counters, grpcRequestProxies[0], true)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbSchemeService(system, counters, grpcRequestProxies[0], true)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbTableService(system, counters, grpcRequestProxies, true, 1)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbScriptingService(system, counters, grpcRequestProxies[0], true)); + GRpcServer->AddService(new NGRpcService::TGRpcOperationService(system, counters, grpcRequestProxies[0], true)); + GRpcServer->AddService(new NGRpcService::V1::TGRpcPersQueueService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxies[0], true)); + GRpcServer->AddService(new NGRpcService::V1::TGRpcTopicService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxies[0], true)); + GRpcServer->AddService(new NGRpcService::TGRpcPQClusterDiscoveryService(system, counters, grpcRequestProxies[0])); + GRpcServer->AddService(new NKesus::TKesusGRpcService(system, counters, grpcRequestProxies[0], true)); + GRpcServer->AddService(new NGRpcService::TGRpcCmsService(system, counters, grpcRequestProxies[0], true)); + GRpcServer->AddService(new NGRpcService::TGRpcDiscoveryService(system, counters, grpcRequestProxies[0], true)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbClickhouseInternalService(system, counters, appData.InFlightLimiterRegistry, grpcRequestProxies[0], true)); + GRpcServer->AddService(new NQuoter::TRateLimiterGRpcService(system, counters, grpcRequestProxies[0])); + GRpcServer->AddService(new NGRpcService::TGRpcYdbLongTxService(system, counters, grpcRequestProxies[0], true)); + GRpcServer->AddService(new NGRpcService::TGRpcDataStreamsService(system, counters, grpcRequestProxies[0], true)); + GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxies[0], true)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxies[0], true)); if (Settings->EnableYq) { - GRpcServer->AddService(new NGRpcService::TGRpcYandexQueryService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::TGRpcFqPrivateTaskService(system, counters, grpcRequestProxyId)); + GRpcServer->AddService(new NGRpcService::TGRpcYandexQueryService(system, counters, grpcRequestProxies[0])); + GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxies[0])); + GRpcServer->AddService(new NGRpcService::TGRpcFqPrivateTaskService(system, counters, grpcRequestProxies[0])); } if (const auto& factory = Settings->GrpcServiceFactory) { // All services enabled by default for ut static const std::unordered_set<TString> dummy; - for (const auto& service : factory->Create(dummy, dummy, system, counters, grpcRequestProxyId)) { + for (const auto& service : factory->Create(dummy, dummy, system, counters, grpcRequestProxies[0])) { GRpcServer->AddService(service); } } - GRpcServer->AddService(new NGRpcService::TGRpcYdbLogStoreService(system, counters, grpcRequestProxyId, true)); - GRpcServer->AddService(new NGRpcService::TGRpcAuthService(system, counters, grpcRequestProxyId, true)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbLogStoreService(system, counters, grpcRequestProxies[0], true)); + GRpcServer->AddService(new NGRpcService::TGRpcAuthService(system, counters, grpcRequestProxies[0], true)); GRpcServer->Start(); } diff --git a/ydb/services/ydb/ydb_table.cpp b/ydb/services/ydb/ydb_table.cpp index 4f66177165..f0bb08649a 100644 --- a/ydb/services/ydb/ydb_table.cpp +++ b/ydb/services/ydb/ydb_table.cpp @@ -9,10 +9,20 @@ namespace NGRpcService { TGRpcYdbTableService::TGRpcYdbTableService(NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id, + const NActors::TActorId& proxyId, bool rlAllowed, size_t handlersPerCompletionQueue) - : TGrpcServiceBase(system, counters, id, rlAllowed) + : TGrpcServiceBase(system, counters, proxyId, rlAllowed) + , HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue)) +{ +} + +TGRpcYdbTableService::TGRpcYdbTableService(NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TVector<NActors::TActorId>& proxies, + bool rlAllowed, + size_t handlersPerCompletionQueue) + : TGrpcServiceBase(system, counters, proxies, rlAllowed) , HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue)) { } @@ -20,6 +30,8 @@ TGRpcYdbTableService::TGRpcYdbTableService(NActors::TActorSystem *system, void TGRpcYdbTableService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); + size_t proxyCounter = 0; + #ifdef ADD_REQUEST_LIMIT #error ADD_REQUEST_LIMIT macro already defined #endif @@ -29,32 +41,34 @@ void TGRpcYdbTableService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { #endif #define ADD_REQUEST_LIMIT(NAME, CB, LIMIT_TYPE) \ - for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \ + for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \ for (auto* cq: CQS) { \ MakeIntrusive<TGRpcRequest<Ydb::Table::NAME##Request, Ydb::Table::NAME##Response, TGRpcYdbTableService>> \ (this, &Service_, cq, \ - [this](NGrpc::IRequestContextBase *ctx) { \ + [this, proxyCounter](NGrpc::IRequestContextBase *ctx) { \ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ActorSystem_->Send(GRpcRequestProxyId_, \ + ActorSystem_->Send(GRpcProxies_[proxyCounter % GRpcProxies_.size()], \ new TGrpcRequestOperationCall<Ydb::Table::NAME##Request, Ydb::Table::NAME##Response> \ (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::LIMIT_TYPE), nullptr})); \ }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \ #NAME, logger, getCounterBlock("table", #NAME))->Run(); \ + ++proxyCounter; \ } \ } #define ADD_STREAM_REQUEST_LIMIT(NAME, IN, OUT, CB, LIMIT_TYPE) \ - for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \ + for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \ for (auto* cq: CQS) { \ MakeIntrusive<TGRpcRequest<Ydb::Table::IN, Ydb::Table::OUT, TGRpcYdbTableService>> \ (this, &Service_, cq, \ - [this](NGrpc::IRequestContextBase *ctx) { \ + [this, proxyCounter](NGrpc::IRequestContextBase *ctx) { \ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ActorSystem_->Send(GRpcRequestProxyId_, \ + ActorSystem_->Send(GRpcProxies_[proxyCounter % GRpcProxies_.size()], \ new TGrpcRequestNoOperationCall<Ydb::Table::IN, Ydb::Table::OUT> \ (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::LIMIT_TYPE), nullptr})); \ }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \ #NAME, logger, getCounterBlock("table", #NAME))->Run(); \ + ++proxyCounter; \ } \ } diff --git a/ydb/services/ydb/ydb_table.h b/ydb/services/ydb/ydb_table.h index 633ac232d2..da248ea6e4 100644 --- a/ydb/services/ydb/ydb_table.h +++ b/ydb/services/ydb/ydb_table.h @@ -17,10 +17,17 @@ public: TGRpcYdbTableService( NActors::TActorSystem *system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id, + const NActors::TActorId& proxyId, bool rlAllowed, size_t handlersPerCompletionQueue = 1); + TGRpcYdbTableService( + NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TVector<NActors::TActorId>& proxies, + bool rlAllowed, + size_t handlersPerCompletionQueue); + private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); |