diff options
author | eivanov89 <eivanov89@ydb.tech> | 2023-02-02 13:02:21 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2023-02-02 13:02:21 +0300 |
commit | 50a0b28842e632c57e92b7bdef0163dc250cfde0 (patch) | |
tree | fde09ff987de64592d473ca22fda5f33df01861c | |
parent | 2073fdf239e9c35938d584718c71fb12fe6afb7d (diff) | |
download | ydb-50a0b28842e632c57e92b7bdef0163dc250cfde0.tar.gz |
provide all CQs to the services and create multiple request handlers per CQ
-rw-r--r-- | ydb/core/driver_lib/run/run.cpp | 3 | ||||
-rw-r--r-- | ydb/core/grpc_services/base/base_service.h | 20 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 3 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_table.cpp | 57 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_table.h | 10 |
5 files changed, 72 insertions, 21 deletions
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index bb1bbfb1977..09cde779df0 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -698,7 +698,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { if (hasTableService) { server.AddService(new NGRpcService::TGRpcYdbTableService(ActorSystem.Get(), Counters, grpcRequestProxyId, - hasTableService.IsRlAllowed())); + hasTableService.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue())); } if (hasClickhouseInternal) { @@ -832,6 +832,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { opts.SetHost(grpcConfig.GetHost()); opts.SetPort(grpcConfig.GetPort()); opts.SetWorkerThreads(grpcConfig.GetWorkerThreads()); + opts.SetWorkersPerCompletionQueue(grpcConfig.GetWorkersPerCompletionQueue()); opts.SetGRpcMemoryQuotaBytes(grpcConfig.GetGRpcMemoryQuotaBytes()); opts.SetMaxMessageSize(grpcConfig.HasMaxMessageSize() ? grpcConfig.GetMaxMessageSize() : DEFAULT_GRPC_MESSAGE_SIZE_LIMIT); opts.SetMaxGlobalRequestInFlight(grpcConfig.GetMaxInFlight()); diff --git a/ydb/core/grpc_services/base/base_service.h b/ydb/core/grpc_services/base/base_service.h index 1c6f6f64322..9b2421e0651 100644 --- a/ydb/core/grpc_services/base/base_service.h +++ b/ydb/core/grpc_services/base/base_service.h @@ -34,8 +34,24 @@ public: , GRpcRequestProxyId_(id) { } + void InitService( + const std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>& cqs, + NGrpc::TLoggerPtr logger, + size_t index) override + { + CQS.reserve(cqs.size()); + for (auto& cq: cqs) { + CQS.push_back(cq.get()); + } + + CQ_ = CQS[index % cqs.size()]; + + // note that we might call an overloaded InitService(), and not the one from this class + InitService(CQ_, logger); + } + void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override { - CQ_ = cq; + CQ_ = cq; // might be self assignment, but it's OK SetupIncomingRequests(std::move(logger)); } @@ -58,6 +74,8 @@ protected: NActors::TActorSystem* ActorSystem_; grpc::ServerCompletionQueue* CQ_ = nullptr; + std::vector<grpc::ServerCompletionQueue*> CQS; + TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; const NActors::TActorId GRpcRequestProxyId_; diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index f1f98a7bd63..96110e8f8fb 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -653,6 +653,9 @@ message TGRpcConfig { optional uint32 KeepAliveMaxProbeCount = 102 [default = 3]; // TCP_KEEPCNT optional uint32 KeepAliveProbeIntervalSec = 103 [default = 10]; // TCP_KEEPINTVL + optional uint32 WorkersPerCompletionQueue = 104 [default = 1]; + optional uint32 HandlersPerCompletionQueue = 105 [default = 10]; + repeated TGRpcConfig ExtEndpoints = 200; // run specific services on separate endpoints } diff --git a/ydb/services/ydb/ydb_table.cpp b/ydb/services/ydb/ydb_table.cpp index 35d55fa4b9d..4f66177165c 100644 --- a/ydb/services/ydb/ydb_table.cpp +++ b/ydb/services/ydb/ydb_table.cpp @@ -7,8 +7,19 @@ namespace NKikimr { namespace NGRpcService { +TGRpcYdbTableService::TGRpcYdbTableService(NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + NActors::TActorId id, + bool rlAllowed, + size_t handlersPerCompletionQueue) + : TGrpcServiceBase(system, counters, id, rlAllowed) + , HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue)) +{ +} + void TGRpcYdbTableService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); + #ifdef ADD_REQUEST_LIMIT #error ADD_REQUEST_LIMIT macro already defined #endif @@ -17,27 +28,35 @@ void TGRpcYdbTableService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { #error ADD_STREAM_REQUEST_LIMIT macro already defined #endif -#define ADD_REQUEST_LIMIT(NAME, CB, LIMIT_TYPE) \ - MakeIntrusive<TGRpcRequest<Ydb::Table::NAME##Request, Ydb::Table::NAME##Response, TGRpcYdbTableService>> \ - (this, &Service_, CQ_, \ - [this](NGrpc::IRequestContextBase *ctx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ActorSystem_->Send(GRpcRequestProxyId_, \ - new TGrpcRequestOperationCall<Ydb::Table::NAME##Request, Ydb::Table::NAME##Response> \ - (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::LIMIT_TYPE), nullptr})); \ - }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \ - #NAME, logger, getCounterBlock("table", #NAME))->Run(); +#define ADD_REQUEST_LIMIT(NAME, CB, LIMIT_TYPE) \ + for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \ + for (auto* cq: CQS) { \ + MakeIntrusive<TGRpcRequest<Ydb::Table::NAME##Request, Ydb::Table::NAME##Response, TGRpcYdbTableService>> \ + (this, &Service_, cq, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new TGrpcRequestOperationCall<Ydb::Table::NAME##Request, Ydb::Table::NAME##Response> \ + (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::LIMIT_TYPE), nullptr})); \ + }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("table", #NAME))->Run(); \ + } \ + } #define ADD_STREAM_REQUEST_LIMIT(NAME, IN, OUT, CB, LIMIT_TYPE) \ - MakeIntrusive<TGRpcRequest<Ydb::Table::IN, Ydb::Table::OUT, TGRpcYdbTableService>> \ - (this, &Service_, CQ_, \ - [this](NGrpc::IRequestContextBase *ctx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ActorSystem_->Send(GRpcRequestProxyId_, \ - new TGrpcRequestNoOperationCall<Ydb::Table::IN, Ydb::Table::OUT> \ - (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::LIMIT_TYPE), nullptr})); \ - }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \ - #NAME, logger, getCounterBlock("table", #NAME))->Run(); + for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \ + for (auto* cq: CQS) { \ + MakeIntrusive<TGRpcRequest<Ydb::Table::IN, Ydb::Table::OUT, TGRpcYdbTableService>> \ + (this, &Service_, cq, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new TGrpcRequestNoOperationCall<Ydb::Table::IN, Ydb::Table::OUT> \ + (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::LIMIT_TYPE), nullptr})); \ + }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("table", #NAME))->Run(); \ + } \ + } ADD_REQUEST_LIMIT(CreateSession, DoCreateSessionRequest, Rps) ADD_REQUEST_LIMIT(KeepAlive, DoKeepAliveRequest, Rps) diff --git a/ydb/services/ydb/ydb_table.h b/ydb/services/ydb/ydb_table.h index 38f9ea77c0b..633ac232d22 100644 --- a/ydb/services/ydb/ydb_table.h +++ b/ydb/services/ydb/ydb_table.h @@ -14,8 +14,18 @@ class TGRpcYdbTableService public: using TGrpcServiceBase<Ydb::Table::V1::TableService>::TGrpcServiceBase; + TGRpcYdbTableService( + NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + NActors::TActorId id, + bool rlAllowed, + size_t handlersPerCompletionQueue = 1); + private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); + +private: + const size_t HandlersPerCompletionQueue; }; } // namespace NGRpcService |