aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/server
diff options
context:
space:
mode:
authorVladimir Gordiychuk <folyga@gmail.com>2022-02-10 16:50:21 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:21 +0300
commite7bf3caf59ff1d3936047c1800d0b9adaba5b647 (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc/server
parent9315561a79f8c08b28065daf027ef493ae27a6d2 (diff)
downloadydb-e7bf3caf59ff1d3936047c1800d0b9adaba5b647.tar.gz
Restoring authorship annotation for Vladimir Gordiychuk <folyga@gmail.com>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/server')
-rw-r--r--library/cpp/grpc/server/grpc_server.cpp90
-rw-r--r--library/cpp/grpc/server/grpc_server.h10
2 files changed, 50 insertions, 50 deletions
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp
index 5d72f74d290..7437b7a8f5e 100644
--- a/library/cpp/grpc/server/grpc_server.cpp
+++ b/library/cpp/grpc/server/grpc_server.cpp
@@ -19,24 +19,24 @@ namespace NGrpc {
using NThreading::TFuture;
-static void PullEvents(grpc::ServerCompletionQueue* cq) {
- TThread::SetCurrentThreadName("grpc_server");
- while (true) {
- void* tag; // uniquely identifies a request.
- bool ok;
-
- if (cq->Next(&tag, &ok)) {
- IQueueEvent* const ev(static_cast<IQueueEvent*>(tag));
-
- if (!ev->Execute(ok)) {
- ev->DestroyRequest();
- }
- } else {
- break;
- }
- }
-}
-
+static void PullEvents(grpc::ServerCompletionQueue* cq) {
+ TThread::SetCurrentThreadName("grpc_server");
+ while (true) {
+ void* tag; // uniquely identifies a request.
+ bool ok;
+
+ if (cq->Next(&tag, &ok)) {
+ IQueueEvent* const ev(static_cast<IQueueEvent*>(tag));
+
+ if (!ev->Execute(ok)) {
+ ev->DestroyRequest();
+ }
+ } else {
+ break;
+ }
+ }
+}
+
TGRpcServer::TGRpcServer(const TServerOptions& opts)
: Options_(opts)
, Limiter_(Options_.MaxGlobalRequestInFlight)
@@ -124,14 +124,14 @@ void TGRpcServer::Start() {
builder.SetOption(std::make_unique<TKeepAliveOption>());
}
- if (Options_.UseCompletionQueuePerThread) {
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- CQS_.push_back(builder.AddCompletionQueue());
- }
- } else {
- CQS_.push_back(builder.AddCompletionQueue());
- }
-
+ if (Options_.UseCompletionQueuePerThread) {
+ for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
+ CQS_.push_back(builder.AddCompletionQueue());
+ }
+ } else {
+ CQS_.push_back(builder.AddCompletionQueue());
+ }
+
if (Options_.GRpcMemoryQuotaBytes) {
// See details KIKIMR-6932
/*
@@ -149,27 +149,27 @@ void TGRpcServer::Start() {
if (!Server_) {
ythrow yexception() << "can't start grpc server on " << server_address;
}
-
- size_t index = 0;
+
+ size_t index = 0;
for (IGRpcServicePtr service : Services_) {
- // TODO: provide something else for services instead of ServerCompletionQueue
+ // TODO: provide something else for services instead of ServerCompletionQueue
service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger);
}
- if (Options_.UseCompletionQueuePerThread) {
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- auto* cq = &CQS_[i];
- Ts.push_back(SystemThreadFactory()->Run([cq] {
- PullEvents(cq->get());
- }));
- }
- } else {
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- auto* cq = &CQS_[0];
- Ts.push_back(SystemThreadFactory()->Run([cq] {
- PullEvents(cq->get());
- }));
- }
+ if (Options_.UseCompletionQueuePerThread) {
+ for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
+ auto* cq = &CQS_[i];
+ Ts.push_back(SystemThreadFactory()->Run([cq] {
+ PullEvents(cq->get());
+ }));
+ }
+ } else {
+ for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
+ auto* cq = &CQS_[0];
+ Ts.push_back(SystemThreadFactory()->Run([cq] {
+ PullEvents(cq->get());
+ }));
+ }
}
if (Options_.ExternalListener) {
@@ -214,8 +214,8 @@ void TGRpcServer::Stop() {
}
// Always shutdown the completion queue after the server.
- for (auto& cq : CQS_) {
- cq->Shutdown();
+ for (auto& cq : CQS_) {
+ cq->Shutdown();
}
for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) {
diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h
index 4d6626efb9a..d6814a90a0d 100644
--- a/library/cpp/grpc/server/grpc_server.h
+++ b/library/cpp/grpc/server/grpc_server.h
@@ -53,9 +53,9 @@ struct TServerOptions {
//! Number of worker threads.
DECLARE_FIELD(WorkerThreads, size_t, 2);
- //! Create one completion queue per thread
- DECLARE_FIELD(UseCompletionQueuePerThread, bool, false);
-
+ //! Create one completion queue per thread
+ DECLARE_FIELD(UseCompletionQueuePerThread, bool, false);
+
//! Memory quota size for grpc server in bytes. Zero means unlimited.
DECLARE_FIELD(GRpcMemoryQuotaBytes, size_t, 0);
@@ -328,7 +328,7 @@ private:
TAdaptiveLock Lock_;
};
-class TGRpcServer {
+class TGRpcServer {
public:
using IGRpcServicePtr = TIntrusivePtr<IGRpcService>;
TGRpcServer(const TServerOptions& opts);
@@ -346,7 +346,7 @@ private:
const TServerOptions Options_;
std::unique_ptr<grpc::Server> Server_;
- std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_;
+ std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_;
TVector<IThreadRef> Ts;
TVector<IGRpcServicePtr> Services_;