aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/server/grpc_server.cpp
diff options
context:
space:
mode:
authorDaniil Cherednik <dan.cherednik@gmail.com>2023-03-31 10:54:08 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2023-03-31 12:28:07 +0300
commitfc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch)
treec15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /library/cpp/grpc/server/grpc_server.cpp
parent8a749596d40e91c896a1907afcd108d9221fbde1 (diff)
downloadydb-fc1cffcfa7f0497a1f97b384a24bcbf23362f3be.tar.gz
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'library/cpp/grpc/server/grpc_server.cpp')
-rw-r--r--library/cpp/grpc/server/grpc_server.cpp37
1 files changed, 17 insertions, 20 deletions
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp
index 97472206e2..0c05c7404e 100644
--- a/library/cpp/grpc/server/grpc_server.cpp
+++ b/library/cpp/grpc/server/grpc_server.cpp
@@ -130,11 +130,16 @@ void TGRpcServer::Start() {
builder.SetOption(std::make_unique<TKeepAliveOption>());
}
- if (Options_.UseCompletionQueuePerThread) {
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- CQS_.push_back(builder.AddCompletionQueue());
- }
- } else {
+ size_t completionQueueCount = 1;
+ if (Options_.WorkersPerCompletionQueue) {
+ size_t threadsPerQueue = Max(std::size_t{1}, Options_.WorkersPerCompletionQueue);
+ completionQueueCount = (Options_.WorkerThreads + threadsPerQueue - 1) / threadsPerQueue; // ceiling
+ } else if (Options_.UseCompletionQueuePerThread) {
+ completionQueueCount = Options_.WorkerThreads;
+ }
+
+ CQS_.reserve(completionQueueCount);
+ for (size_t i = 0; i < completionQueueCount; ++i) {
CQS_.push_back(builder.AddCompletionQueue());
}
@@ -159,23 +164,15 @@ void TGRpcServer::Start() {
size_t index = 0;
for (IGRpcServicePtr service : Services_) {
// TODO: provide something else for services instead of ServerCompletionQueue
- service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger);
+ service->InitService(CQS_, Options_.Logger, index++);
}
- if (Options_.UseCompletionQueuePerThread) {
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- auto* cq = &CQS_[i];
- Ts.push_back(SystemThreadFactory()->Run([cq] {
- PullEvents(cq->get());
- }));
- }
- } else {
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- auto* cq = &CQS_[0];
- Ts.push_back(SystemThreadFactory()->Run([cq] {
- PullEvents(cq->get());
- }));
- }
+ Ts.reserve(Options_.WorkerThreads);
+ for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
+ auto* cq = &CQS_[i % CQS_.size()];
+ Ts.push_back(SystemThreadFactory()->Run([cq] {
+ PullEvents(cq->get());
+ }));
}
if (Options_.ExternalListener) {