aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2023-02-02 13:02:21 +0300
committereivanov89 <eivanov89@ydb.tech>2023-02-02 13:02:21 +0300
commit50a0b28842e632c57e92b7bdef0163dc250cfde0 (patch)
treefde09ff987de64592d473ca22fda5f33df01861c
parent2073fdf239e9c35938d584718c71fb12fe6afb7d (diff)
downloadydb-50a0b28842e632c57e92b7bdef0163dc250cfde0.tar.gz
provide all CQs to the services and create multiple request handlers per CQ
-rw-r--r--ydb/core/driver_lib/run/run.cpp3
-rw-r--r--ydb/core/grpc_services/base/base_service.h20
-rw-r--r--ydb/core/protos/config.proto3
-rw-r--r--ydb/services/ydb/ydb_table.cpp57
-rw-r--r--ydb/services/ydb/ydb_table.h10
5 files changed, 72 insertions, 21 deletions
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index bb1bbfb1977..09cde779df0 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -698,7 +698,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
if (hasTableService) {
server.AddService(new NGRpcService::TGRpcYdbTableService(ActorSystem.Get(), Counters, grpcRequestProxyId,
- hasTableService.IsRlAllowed()));
+ hasTableService.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue()));
}
if (hasClickhouseInternal) {
@@ -832,6 +832,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
opts.SetHost(grpcConfig.GetHost());
opts.SetPort(grpcConfig.GetPort());
opts.SetWorkerThreads(grpcConfig.GetWorkerThreads());
+ opts.SetWorkersPerCompletionQueue(grpcConfig.GetWorkersPerCompletionQueue());
opts.SetGRpcMemoryQuotaBytes(grpcConfig.GetGRpcMemoryQuotaBytes());
opts.SetMaxMessageSize(grpcConfig.HasMaxMessageSize() ? grpcConfig.GetMaxMessageSize() : DEFAULT_GRPC_MESSAGE_SIZE_LIMIT);
opts.SetMaxGlobalRequestInFlight(grpcConfig.GetMaxInFlight());
diff --git a/ydb/core/grpc_services/base/base_service.h b/ydb/core/grpc_services/base/base_service.h
index 1c6f6f64322..9b2421e0651 100644
--- a/ydb/core/grpc_services/base/base_service.h
+++ b/ydb/core/grpc_services/base/base_service.h
@@ -34,8 +34,24 @@ public:
, GRpcRequestProxyId_(id)
{ }
+ void InitService(
+ const std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>& cqs,
+ NGrpc::TLoggerPtr logger,
+ size_t index) override
+ {
+ CQS.reserve(cqs.size());
+ for (auto& cq: cqs) {
+ CQS.push_back(cq.get());
+ }
+
+ CQ_ = CQS[index % cqs.size()];
+
+ // note that we might call an overloaded InitService(), and not the one from this class
+ InitService(CQ_, logger);
+ }
+
void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override {
- CQ_ = cq;
+ CQ_ = cq; // might be self assignment, but it's OK
SetupIncomingRequests(std::move(logger));
}
@@ -58,6 +74,8 @@ protected:
NActors::TActorSystem* ActorSystem_;
grpc::ServerCompletionQueue* CQ_ = nullptr;
+ std::vector<grpc::ServerCompletionQueue*> CQS;
+
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
const NActors::TActorId GRpcRequestProxyId_;
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index f1f98a7bd63..96110e8f8fb 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -653,6 +653,9 @@ message TGRpcConfig {
optional uint32 KeepAliveMaxProbeCount = 102 [default = 3]; // TCP_KEEPCNT
optional uint32 KeepAliveProbeIntervalSec = 103 [default = 10]; // TCP_KEEPINTVL
+ optional uint32 WorkersPerCompletionQueue = 104 [default = 1];
+ optional uint32 HandlersPerCompletionQueue = 105 [default = 10];
+
repeated TGRpcConfig ExtEndpoints = 200; // run specific services on separate endpoints
}
diff --git a/ydb/services/ydb/ydb_table.cpp b/ydb/services/ydb/ydb_table.cpp
index 35d55fa4b9d..4f66177165c 100644
--- a/ydb/services/ydb/ydb_table.cpp
+++ b/ydb/services/ydb/ydb_table.cpp
@@ -7,8 +7,19 @@
namespace NKikimr {
namespace NGRpcService {
+TGRpcYdbTableService::TGRpcYdbTableService(NActors::TActorSystem *system,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ NActors::TActorId id,
+ bool rlAllowed,
+ size_t handlersPerCompletionQueue)
+ : TGrpcServiceBase(system, counters, id, rlAllowed)
+ , HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue))
+{
+}
+
void TGRpcYdbTableService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
+
#ifdef ADD_REQUEST_LIMIT
#error ADD_REQUEST_LIMIT macro already defined
#endif
@@ -17,27 +28,35 @@ void TGRpcYdbTableService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
#error ADD_STREAM_REQUEST_LIMIT macro already defined
#endif
-#define ADD_REQUEST_LIMIT(NAME, CB, LIMIT_TYPE) \
- MakeIntrusive<TGRpcRequest<Ydb::Table::NAME##Request, Ydb::Table::NAME##Response, TGRpcYdbTableService>> \
- (this, &Service_, CQ_, \
- [this](NGrpc::IRequestContextBase *ctx) { \
- NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
- ActorSystem_->Send(GRpcRequestProxyId_, \
- new TGrpcRequestOperationCall<Ydb::Table::NAME##Request, Ydb::Table::NAME##Response> \
- (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::LIMIT_TYPE), nullptr})); \
- }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \
- #NAME, logger, getCounterBlock("table", #NAME))->Run();
+#define ADD_REQUEST_LIMIT(NAME, CB, LIMIT_TYPE) \
+ for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \
+ for (auto* cq: CQS) { \
+ MakeIntrusive<TGRpcRequest<Ydb::Table::NAME##Request, Ydb::Table::NAME##Response, TGRpcYdbTableService>> \
+ (this, &Service_, cq, \
+ [this](NGrpc::IRequestContextBase *ctx) { \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
+ ActorSystem_->Send(GRpcRequestProxyId_, \
+ new TGrpcRequestOperationCall<Ydb::Table::NAME##Request, Ydb::Table::NAME##Response> \
+ (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::LIMIT_TYPE), nullptr})); \
+ }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \
+ #NAME, logger, getCounterBlock("table", #NAME))->Run(); \
+ } \
+ }
#define ADD_STREAM_REQUEST_LIMIT(NAME, IN, OUT, CB, LIMIT_TYPE) \
- MakeIntrusive<TGRpcRequest<Ydb::Table::IN, Ydb::Table::OUT, TGRpcYdbTableService>> \
- (this, &Service_, CQ_, \
- [this](NGrpc::IRequestContextBase *ctx) { \
- NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
- ActorSystem_->Send(GRpcRequestProxyId_, \
- new TGrpcRequestNoOperationCall<Ydb::Table::IN, Ydb::Table::OUT> \
- (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::LIMIT_TYPE), nullptr})); \
- }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \
- #NAME, logger, getCounterBlock("table", #NAME))->Run();
+ for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \
+ for (auto* cq: CQS) { \
+ MakeIntrusive<TGRpcRequest<Ydb::Table::IN, Ydb::Table::OUT, TGRpcYdbTableService>> \
+ (this, &Service_, cq, \
+ [this](NGrpc::IRequestContextBase *ctx) { \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
+ ActorSystem_->Send(GRpcRequestProxyId_, \
+ new TGrpcRequestNoOperationCall<Ydb::Table::IN, Ydb::Table::OUT> \
+ (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::LIMIT_TYPE), nullptr})); \
+ }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \
+ #NAME, logger, getCounterBlock("table", #NAME))->Run(); \
+ } \
+ }
ADD_REQUEST_LIMIT(CreateSession, DoCreateSessionRequest, Rps)
ADD_REQUEST_LIMIT(KeepAlive, DoKeepAliveRequest, Rps)
diff --git a/ydb/services/ydb/ydb_table.h b/ydb/services/ydb/ydb_table.h
index 38f9ea77c0b..633ac232d22 100644
--- a/ydb/services/ydb/ydb_table.h
+++ b/ydb/services/ydb/ydb_table.h
@@ -14,8 +14,18 @@ class TGRpcYdbTableService
public:
using TGrpcServiceBase<Ydb::Table::V1::TableService>::TGrpcServiceBase;
+ TGRpcYdbTableService(
+ NActors::TActorSystem *system,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ NActors::TActorId id,
+ bool rlAllowed,
+ size_t handlersPerCompletionQueue = 1);
+
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
+
+private:
+ const size_t HandlersPerCompletionQueue;
};
} // namespace NGRpcService