diff options
author | Vladimir Gordiychuk <folyga@gmail.com> | 2022-02-10 16:50:21 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:21 +0300 |
commit | e7bf3caf59ff1d3936047c1800d0b9adaba5b647 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc/server | |
parent | 9315561a79f8c08b28065daf027ef493ae27a6d2 (diff) | |
download | ydb-e7bf3caf59ff1d3936047c1800d0b9adaba5b647.tar.gz |
Restoring authorship annotation for Vladimir Gordiychuk <folyga@gmail.com>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/server')
-rw-r--r-- | library/cpp/grpc/server/grpc_server.cpp | 90 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_server.h | 10 |
2 files changed, 50 insertions, 50 deletions
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp index 5d72f74d290..7437b7a8f5e 100644 --- a/library/cpp/grpc/server/grpc_server.cpp +++ b/library/cpp/grpc/server/grpc_server.cpp @@ -19,24 +19,24 @@ namespace NGrpc { using NThreading::TFuture; -static void PullEvents(grpc::ServerCompletionQueue* cq) { - TThread::SetCurrentThreadName("grpc_server"); - while (true) { - void* tag; // uniquely identifies a request. - bool ok; - - if (cq->Next(&tag, &ok)) { - IQueueEvent* const ev(static_cast<IQueueEvent*>(tag)); - - if (!ev->Execute(ok)) { - ev->DestroyRequest(); - } - } else { - break; - } - } -} - +static void PullEvents(grpc::ServerCompletionQueue* cq) { + TThread::SetCurrentThreadName("grpc_server"); + while (true) { + void* tag; // uniquely identifies a request. + bool ok; + + if (cq->Next(&tag, &ok)) { + IQueueEvent* const ev(static_cast<IQueueEvent*>(tag)); + + if (!ev->Execute(ok)) { + ev->DestroyRequest(); + } + } else { + break; + } + } +} + TGRpcServer::TGRpcServer(const TServerOptions& opts) : Options_(opts) , Limiter_(Options_.MaxGlobalRequestInFlight) @@ -124,14 +124,14 @@ void TGRpcServer::Start() { builder.SetOption(std::make_unique<TKeepAliveOption>()); } - if (Options_.UseCompletionQueuePerThread) { - for (size_t i = 0; i < Options_.WorkerThreads; ++i) { - CQS_.push_back(builder.AddCompletionQueue()); - } - } else { - CQS_.push_back(builder.AddCompletionQueue()); - } - + if (Options_.UseCompletionQueuePerThread) { + for (size_t i = 0; i < Options_.WorkerThreads; ++i) { + CQS_.push_back(builder.AddCompletionQueue()); + } + } else { + CQS_.push_back(builder.AddCompletionQueue()); + } + if (Options_.GRpcMemoryQuotaBytes) { // See details KIKIMR-6932 /* @@ -149,27 +149,27 @@ void TGRpcServer::Start() { if (!Server_) { ythrow yexception() << "can't start grpc server on " << server_address; } - - size_t index = 0; + + size_t index = 0; for (IGRpcServicePtr service : Services_) { - // TODO: provide something else for services instead of ServerCompletionQueue + // TODO: provide something else for services instead of ServerCompletionQueue service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger); } - if (Options_.UseCompletionQueuePerThread) { - for (size_t i = 0; i < Options_.WorkerThreads; ++i) { - auto* cq = &CQS_[i]; - Ts.push_back(SystemThreadFactory()->Run([cq] { - PullEvents(cq->get()); - })); - } - } else { - for (size_t i = 0; i < Options_.WorkerThreads; ++i) { - auto* cq = &CQS_[0]; - Ts.push_back(SystemThreadFactory()->Run([cq] { - PullEvents(cq->get()); - })); - } + if (Options_.UseCompletionQueuePerThread) { + for (size_t i = 0; i < Options_.WorkerThreads; ++i) { + auto* cq = &CQS_[i]; + Ts.push_back(SystemThreadFactory()->Run([cq] { + PullEvents(cq->get()); + })); + } + } else { + for (size_t i = 0; i < Options_.WorkerThreads; ++i) { + auto* cq = &CQS_[0]; + Ts.push_back(SystemThreadFactory()->Run([cq] { + PullEvents(cq->get()); + })); + } } if (Options_.ExternalListener) { @@ -214,8 +214,8 @@ void TGRpcServer::Stop() { } // Always shutdown the completion queue after the server. - for (auto& cq : CQS_) { - cq->Shutdown(); + for (auto& cq : CQS_) { + cq->Shutdown(); } for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) { diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h index 4d6626efb9a..d6814a90a0d 100644 --- a/library/cpp/grpc/server/grpc_server.h +++ b/library/cpp/grpc/server/grpc_server.h @@ -53,9 +53,9 @@ struct TServerOptions { //! Number of worker threads. DECLARE_FIELD(WorkerThreads, size_t, 2); - //! Create one completion queue per thread - DECLARE_FIELD(UseCompletionQueuePerThread, bool, false); - + //! Create one completion queue per thread + DECLARE_FIELD(UseCompletionQueuePerThread, bool, false); + //! Memory quota size for grpc server in bytes. Zero means unlimited. DECLARE_FIELD(GRpcMemoryQuotaBytes, size_t, 0); @@ -328,7 +328,7 @@ private: TAdaptiveLock Lock_; }; -class TGRpcServer { +class TGRpcServer { public: using IGRpcServicePtr = TIntrusivePtr<IGRpcService>; TGRpcServer(const TServerOptions& opts); @@ -346,7 +346,7 @@ private: const TServerOptions Options_; std::unique_ptr<grpc::Server> Server_; - std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_; + std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_; TVector<IThreadRef> Ts; TVector<IGRpcServicePtr> Services_; |