diff options
author | eivanov89 <eivanov89@ydb.tech> | 2023-01-31 20:33:45 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2023-01-31 20:33:45 +0300 |
commit | bbc27c3fbad3ce6cbd9aa2f5b652df89ca861bab (patch) | |
tree | 3ac88835a51f8943675a3de892b8caf9d6f221ad /library/cpp | |
parent | 9e9a539f90b6551d4626af86d5b66f8447349936 (diff) | |
download | ydb-bbc27c3fbad3ce6cbd9aa2f5b652df89ca861bab.tar.gz |
add option to set number of workers per CQ
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/grpc/server/grpc_server.cpp | 37 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_server.h | 19 |
2 files changed, 35 insertions, 21 deletions
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp index 97472206e2..0c05c7404e 100644 --- a/library/cpp/grpc/server/grpc_server.cpp +++ b/library/cpp/grpc/server/grpc_server.cpp @@ -130,11 +130,16 @@ void TGRpcServer::Start() { builder.SetOption(std::make_unique<TKeepAliveOption>()); } - if (Options_.UseCompletionQueuePerThread) { - for (size_t i = 0; i < Options_.WorkerThreads; ++i) { - CQS_.push_back(builder.AddCompletionQueue()); - } - } else { + size_t completionQueueCount = 1; + if (Options_.WorkersPerCompletionQueue) { + size_t threadsPerQueue = Max(std::size_t{1}, Options_.WorkersPerCompletionQueue); + completionQueueCount = (Options_.WorkerThreads + threadsPerQueue - 1) / threadsPerQueue; // ceiling + } else if (Options_.UseCompletionQueuePerThread) { + completionQueueCount = Options_.WorkerThreads; + } + + CQS_.reserve(completionQueueCount); + for (size_t i = 0; i < completionQueueCount; ++i) { CQS_.push_back(builder.AddCompletionQueue()); } @@ -159,23 +164,15 @@ void TGRpcServer::Start() { size_t index = 0; for (IGRpcServicePtr service : Services_) { // TODO: provide something else for services instead of ServerCompletionQueue - service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger); + service->InitService(CQS_, Options_.Logger, index++); } - if (Options_.UseCompletionQueuePerThread) { - for (size_t i = 0; i < Options_.WorkerThreads; ++i) { - auto* cq = &CQS_[i]; - Ts.push_back(SystemThreadFactory()->Run([cq] { - PullEvents(cq->get()); - })); - } - } else { - for (size_t i = 0; i < Options_.WorkerThreads; ++i) { - auto* cq = &CQS_[0]; - Ts.push_back(SystemThreadFactory()->Run([cq] { - PullEvents(cq->get()); - })); - } + Ts.reserve(Options_.WorkerThreads); + for (size_t i = 0; i < Options_.WorkerThreads; ++i) { + auto* cq = &CQS_[i % CQS_.size()]; + Ts.push_back(SystemThreadFactory()->Run([cq] { + PullEvents(cq->get()); + })); } if (Options_.ExternalListener) { diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h index c9b48a6676..1f31f7166a 100644 --- a/library/cpp/grpc/server/grpc_server.h +++ b/library/cpp/grpc/server/grpc_server.h @@ -54,7 +54,14 @@ struct TServerOptions { //! Number of worker threads. DECLARE_FIELD(WorkerThreads, size_t, 2); - //! Create one completion queue per thread + //! Number of workers per completion queue, i.e. when + // WorkerThreads=8 and PriorityWorkersPerCompletionQueue=2 + // there will be 4 completion queues. When set to 0 then + // only UseCompletionQueuePerThread affects number of CQ. + DECLARE_FIELD(WorkersPerCompletionQueue, size_t, 0); + + //! Obsolete. Create one completion queue per thread. + // Setting true equals to the WorkersPerCompletionQueue=1 DECLARE_FIELD(UseCompletionQueuePerThread, bool, false); //! Memory quota size for grpc server in bytes. Zero means unlimited. @@ -166,7 +173,17 @@ class IGRpcService: public TThrRefBase { public: virtual grpc::Service* GetService() = 0; virtual void StopService() noexcept = 0; + virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0; + + virtual void InitService( + const std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>& cqs, + TLoggerPtr logger, + size_t index) + { + InitService(cqs[index % cqs.size()].get(), logger); + } + virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0; virtual bool IsUnsafeToShutdown() const = 0; virtual size_t RequestsInProgress() const = 0; |