aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc
diff options
context:
space:
mode:
authorVladimir Gordiychuk <folyga@gmail.com>2022-02-10 16:50:21 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:21 +0300
commit9315561a79f8c08b28065daf027ef493ae27a6d2 (patch)
tree015bf2f66ecea81b37d6791fe2e8948e18e07186 /library/cpp/grpc
parent0dd632d6fc5676c75d7004172992cefaa2192db0 (diff)
downloadydb-9315561a79f8c08b28065daf027ef493ae27a6d2.tar.gz
Restoring authorship annotation for Vladimir Gordiychuk <folyga@gmail.com>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/grpc')
-rw-r--r--library/cpp/grpc/client/grpc_client_low.cpp118
-rw-r--r--library/cpp/grpc/client/grpc_client_low.h8
-rw-r--r--library/cpp/grpc/server/grpc_server.cpp90
-rw-r--r--library/cpp/grpc/server/grpc_server.h10
4 files changed, 113 insertions, 113 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp
index 73cc908ef8..75f80767dd 100644
--- a/library/cpp/grpc/client/grpc_client_low.cpp
+++ b/library/cpp/grpc/client/grpc_client_low.cpp
@@ -5,8 +5,8 @@
#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <util/string/printf.h>
-#include <util/system/thread.h>
-#include <util/random/random.h>
+#include <util/system/thread.h>
+#include <util/random/random.h>
#if !defined(_WIN32) && !defined(_WIN64)
#include <sys/types.h>
@@ -175,24 +175,24 @@ void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TStri
LastUsedQueue_.erase(pos);
}
-static void PullEvents(grpc::CompletionQueue* cq) {
- TThread::SetCurrentThreadName("grpc_client");
- while (true) {
- void* tag;
- bool ok;
-
- if (!cq->Next(&tag, &ok)) {
- break;
- }
-
- if (auto* ev = static_cast<IQueueClientEvent*>(tag)) {
- if (!ev->Execute(ok)) {
- ev->Destroy();
- }
- }
- }
-}
-
+static void PullEvents(grpc::CompletionQueue* cq) {
+ TThread::SetCurrentThreadName("grpc_client");
+ while (true) {
+ void* tag;
+ bool ok;
+
+ if (!cq->Next(&tag, &ok)) {
+ break;
+ }
+
+ if (auto* ev = static_cast<IQueueClientEvent*>(tag)) {
+ if (!ev->Execute(ok)) {
+ ev->Destroy();
+ }
+ }
+ }
+}
+
class TGRpcClientLow::TContextImpl final
: public std::enable_shared_from_this<TContextImpl>
, public IQueueClientContext
@@ -249,7 +249,7 @@ public:
// It's now safe to initialize parent and owner
child->Parent = std::move(self);
child->Owner = Owner;
- child->CQ = CQ;
+ child->CQ = CQ;
// Propagate cancellation to a child context
if (Cancelled.load(std::memory_order_relaxed)) {
@@ -262,7 +262,7 @@ public:
grpc::CompletionQueue* CompletionQueue() override {
Y_VERIFY(Owner, "Uninitialized context");
- return CQ;
+ return CQ;
}
bool IsCancelled() const override {
@@ -388,7 +388,7 @@ private:
// These fields are initialized on successful registration
TContextPtr Parent;
TGRpcClientLow* Owner = nullptr;
- grpc::CompletionQueue* CQ = nullptr;
+ grpc::CompletionQueue* CQ = nullptr;
// Some children are stored inline, others are in a set
std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } };
@@ -401,46 +401,46 @@ private:
std::atomic<bool> Cancelled;
};
-TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread)
- : UseCompletionQueuePerThread_(useCompletionQueuePerThread)
-{
+TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread)
+ : UseCompletionQueuePerThread_(useCompletionQueuePerThread)
+{
Init(numWorkerThread);
}
void TGRpcClientLow::Init(size_t numWorkerThread) {
SetCqState(WORKING);
- if (UseCompletionQueuePerThread_) {
- for (size_t i = 0; i < numWorkerThread; i++) {
- CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
- auto* cq = CQS_.back().get();
- WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
- PullEvents(cq);
- }).Release());
- }
- } else {
- CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
- auto* cq = CQS_.back().get();
- for (size_t i = 0; i < numWorkerThread; i++) {
- WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
- PullEvents(cq);
- }).Release());
- }
+ if (UseCompletionQueuePerThread_) {
+ for (size_t i = 0; i < numWorkerThread; i++) {
+ CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
+ auto* cq = CQS_.back().get();
+ WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
+ PullEvents(cq);
+ }).Release());
+ }
+ } else {
+ CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
+ auto* cq = CQS_.back().get();
+ for (size_t i = 0; i < numWorkerThread; i++) {
+ WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
+ PullEvents(cq);
+ }).Release());
+ }
}
}
void TGRpcClientLow::AddWorkerThreadForTest() {
- if (UseCompletionQueuePerThread_) {
- CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
- auto* cq = CQS_.back().get();
- WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
- PullEvents(cq);
- }).Release());
- } else {
- auto* cq = CQS_.back().get();
- WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
- PullEvents(cq);
- }).Release());
- }
+ if (UseCompletionQueuePerThread_) {
+ CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
+ auto* cq = CQS_.back().get();
+ WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
+ PullEvents(cq);
+ }).Release());
+ } else {
+ auto* cq = CQS_.back().get();
+ WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
+ PullEvents(cq);
+ }).Release());
+ }
}
TGRpcClientLow::~TGRpcClientLow() {
@@ -503,9 +503,9 @@ void TGRpcClientLow::StopInternal(bool silent) {
}
if (shutdown) {
- for (auto& cq : CQS_) {
- cq->Shutdown();
- }
+ for (auto& cq : CQS_) {
+ cq->Shutdown();
+ }
}
}
@@ -577,8 +577,8 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) {
if (shutdown) {
// This was the last context, shutdown CQ
- for (auto& cq : CQS_) {
- cq->Shutdown();
+ for (auto& cq : CQS_) {
+ cq->Shutdown();
}
}
}
diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h
index ab0a0627be..627c88ca67 100644
--- a/library/cpp/grpc/client/grpc_client_low.h
+++ b/library/cpp/grpc/client/grpc_client_low.h
@@ -1320,7 +1320,7 @@ private:
};
class TGRpcClientLow
- : public IQueueClientContextProvider
+ : public IQueueClientContextProvider
{
class TContextImpl;
friend class TContextImpl;
@@ -1332,7 +1332,7 @@ class TGRpcClientLow
};
public:
- explicit TGRpcClientLow(size_t numWorkerThread = DEFAULT_NUM_THREADS, bool useCompletionQueuePerThread = false);
+ explicit TGRpcClientLow(size_t numWorkerThread = DEFAULT_NUM_THREADS, bool useCompletionQueuePerThread = false);
~TGRpcClientLow();
// Tries to stop all currently running requests (via their stop callbacks)
@@ -1372,7 +1372,7 @@ public:
private:
using IThreadRef = std::unique_ptr<IThreadFactory::IThread>;
- using CompletionQueueRef = std::unique_ptr<grpc::CompletionQueue>;
+ using CompletionQueueRef = std::unique_ptr<grpc::CompletionQueue>;
void Init(size_t numWorkerThread);
inline ECqState GetCqState() const { return (ECqState) AtomicGet(CqState_); }
@@ -1384,7 +1384,7 @@ private:
void ForgetContext(TContextImpl* context);
private:
- bool UseCompletionQueuePerThread_;
+ bool UseCompletionQueuePerThread_;
std::vector<CompletionQueueRef> CQS_;
std::vector<IThreadRef> WorkerThreads_;
TAtomic CqState_ = -1;
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp
index 7437b7a8f5..5d72f74d29 100644
--- a/library/cpp/grpc/server/grpc_server.cpp
+++ b/library/cpp/grpc/server/grpc_server.cpp
@@ -19,24 +19,24 @@ namespace NGrpc {
using NThreading::TFuture;
-static void PullEvents(grpc::ServerCompletionQueue* cq) {
- TThread::SetCurrentThreadName("grpc_server");
- while (true) {
- void* tag; // uniquely identifies a request.
- bool ok;
-
- if (cq->Next(&tag, &ok)) {
- IQueueEvent* const ev(static_cast<IQueueEvent*>(tag));
-
- if (!ev->Execute(ok)) {
- ev->DestroyRequest();
- }
- } else {
- break;
- }
- }
-}
-
+static void PullEvents(grpc::ServerCompletionQueue* cq) {
+ TThread::SetCurrentThreadName("grpc_server");
+ while (true) {
+ void* tag; // uniquely identifies a request.
+ bool ok;
+
+ if (cq->Next(&tag, &ok)) {
+ IQueueEvent* const ev(static_cast<IQueueEvent*>(tag));
+
+ if (!ev->Execute(ok)) {
+ ev->DestroyRequest();
+ }
+ } else {
+ break;
+ }
+ }
+}
+
TGRpcServer::TGRpcServer(const TServerOptions& opts)
: Options_(opts)
, Limiter_(Options_.MaxGlobalRequestInFlight)
@@ -124,14 +124,14 @@ void TGRpcServer::Start() {
builder.SetOption(std::make_unique<TKeepAliveOption>());
}
- if (Options_.UseCompletionQueuePerThread) {
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- CQS_.push_back(builder.AddCompletionQueue());
- }
- } else {
- CQS_.push_back(builder.AddCompletionQueue());
- }
-
+ if (Options_.UseCompletionQueuePerThread) {
+ for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
+ CQS_.push_back(builder.AddCompletionQueue());
+ }
+ } else {
+ CQS_.push_back(builder.AddCompletionQueue());
+ }
+
if (Options_.GRpcMemoryQuotaBytes) {
// See details KIKIMR-6932
/*
@@ -149,27 +149,27 @@ void TGRpcServer::Start() {
if (!Server_) {
ythrow yexception() << "can't start grpc server on " << server_address;
}
-
- size_t index = 0;
+
+ size_t index = 0;
for (IGRpcServicePtr service : Services_) {
- // TODO: provide something else for services instead of ServerCompletionQueue
+ // TODO: provide something else for services instead of ServerCompletionQueue
service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger);
}
- if (Options_.UseCompletionQueuePerThread) {
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- auto* cq = &CQS_[i];
- Ts.push_back(SystemThreadFactory()->Run([cq] {
- PullEvents(cq->get());
- }));
- }
- } else {
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- auto* cq = &CQS_[0];
- Ts.push_back(SystemThreadFactory()->Run([cq] {
- PullEvents(cq->get());
- }));
- }
+ if (Options_.UseCompletionQueuePerThread) {
+ for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
+ auto* cq = &CQS_[i];
+ Ts.push_back(SystemThreadFactory()->Run([cq] {
+ PullEvents(cq->get());
+ }));
+ }
+ } else {
+ for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
+ auto* cq = &CQS_[0];
+ Ts.push_back(SystemThreadFactory()->Run([cq] {
+ PullEvents(cq->get());
+ }));
+ }
}
if (Options_.ExternalListener) {
@@ -214,8 +214,8 @@ void TGRpcServer::Stop() {
}
// Always shutdown the completion queue after the server.
- for (auto& cq : CQS_) {
- cq->Shutdown();
+ for (auto& cq : CQS_) {
+ cq->Shutdown();
}
for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) {
diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h
index d6814a90a0..4d6626efb9 100644
--- a/library/cpp/grpc/server/grpc_server.h
+++ b/library/cpp/grpc/server/grpc_server.h
@@ -53,9 +53,9 @@ struct TServerOptions {
//! Number of worker threads.
DECLARE_FIELD(WorkerThreads, size_t, 2);
- //! Create one completion queue per thread
- DECLARE_FIELD(UseCompletionQueuePerThread, bool, false);
-
+ //! Create one completion queue per thread
+ DECLARE_FIELD(UseCompletionQueuePerThread, bool, false);
+
//! Memory quota size for grpc server in bytes. Zero means unlimited.
DECLARE_FIELD(GRpcMemoryQuotaBytes, size_t, 0);
@@ -328,7 +328,7 @@ private:
TAdaptiveLock Lock_;
};
-class TGRpcServer {
+class TGRpcServer {
public:
using IGRpcServicePtr = TIntrusivePtr<IGRpcService>;
TGRpcServer(const TServerOptions& opts);
@@ -346,7 +346,7 @@ private:
const TServerOptions Options_;
std::unique_ptr<grpc::Server> Server_;
- std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_;
+ std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_;
TVector<IThreadRef> Ts;
TVector<IGRpcServicePtr> Services_;