aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2023-01-31 20:33:45 +0300
committereivanov89 <eivanov89@ydb.tech>2023-01-31 20:33:45 +0300
commitbbc27c3fbad3ce6cbd9aa2f5b652df89ca861bab (patch)
tree3ac88835a51f8943675a3de892b8caf9d6f221ad /library/cpp
parent9e9a539f90b6551d4626af86d5b66f8447349936 (diff)
downloadydb-bbc27c3fbad3ce6cbd9aa2f5b652df89ca861bab.tar.gz
add option to set number of workers per CQ
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/grpc/server/grpc_server.cpp37
-rw-r--r--library/cpp/grpc/server/grpc_server.h19
2 files changed, 35 insertions, 21 deletions
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp
index 97472206e2..0c05c7404e 100644
--- a/library/cpp/grpc/server/grpc_server.cpp
+++ b/library/cpp/grpc/server/grpc_server.cpp
@@ -130,11 +130,16 @@ void TGRpcServer::Start() {
builder.SetOption(std::make_unique<TKeepAliveOption>());
}
- if (Options_.UseCompletionQueuePerThread) {
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- CQS_.push_back(builder.AddCompletionQueue());
- }
- } else {
+ size_t completionQueueCount = 1;
+ if (Options_.WorkersPerCompletionQueue) {
+ size_t threadsPerQueue = Max(std::size_t{1}, Options_.WorkersPerCompletionQueue);
+ completionQueueCount = (Options_.WorkerThreads + threadsPerQueue - 1) / threadsPerQueue; // ceiling
+ } else if (Options_.UseCompletionQueuePerThread) {
+ completionQueueCount = Options_.WorkerThreads;
+ }
+
+ CQS_.reserve(completionQueueCount);
+ for (size_t i = 0; i < completionQueueCount; ++i) {
CQS_.push_back(builder.AddCompletionQueue());
}
@@ -159,23 +164,15 @@ void TGRpcServer::Start() {
size_t index = 0;
for (IGRpcServicePtr service : Services_) {
// TODO: provide something else for services instead of ServerCompletionQueue
- service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger);
+ service->InitService(CQS_, Options_.Logger, index++);
}
- if (Options_.UseCompletionQueuePerThread) {
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- auto* cq = &CQS_[i];
- Ts.push_back(SystemThreadFactory()->Run([cq] {
- PullEvents(cq->get());
- }));
- }
- } else {
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- auto* cq = &CQS_[0];
- Ts.push_back(SystemThreadFactory()->Run([cq] {
- PullEvents(cq->get());
- }));
- }
+ Ts.reserve(Options_.WorkerThreads);
+ for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
+ auto* cq = &CQS_[i % CQS_.size()];
+ Ts.push_back(SystemThreadFactory()->Run([cq] {
+ PullEvents(cq->get());
+ }));
}
if (Options_.ExternalListener) {
diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h
index c9b48a6676..1f31f7166a 100644
--- a/library/cpp/grpc/server/grpc_server.h
+++ b/library/cpp/grpc/server/grpc_server.h
@@ -54,7 +54,14 @@ struct TServerOptions {
//! Number of worker threads.
DECLARE_FIELD(WorkerThreads, size_t, 2);
- //! Create one completion queue per thread
+ //! Number of workers per completion queue, i.e. when
+ // WorkerThreads=8 and PriorityWorkersPerCompletionQueue=2
+ // there will be 4 completion queues. When set to 0 then
+ // only UseCompletionQueuePerThread affects number of CQ.
+ DECLARE_FIELD(WorkersPerCompletionQueue, size_t, 0);
+
+ //! Obsolete. Create one completion queue per thread.
+ // Setting true equals to the WorkersPerCompletionQueue=1
DECLARE_FIELD(UseCompletionQueuePerThread, bool, false);
//! Memory quota size for grpc server in bytes. Zero means unlimited.
@@ -166,7 +173,17 @@ class IGRpcService: public TThrRefBase {
public:
virtual grpc::Service* GetService() = 0;
virtual void StopService() noexcept = 0;
+
virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0;
+
+ virtual void InitService(
+ const std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>& cqs,
+ TLoggerPtr logger,
+ size_t index)
+ {
+ InitService(cqs[index % cqs.size()].get(), logger);
+ }
+
virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0;
virtual bool IsUnsafeToShutdown() const = 0;
virtual size_t RequestsInProgress() const = 0;