diff options
author | Vladimir Gordiychuk <folyga@gmail.com> | 2022-02-10 16:50:21 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:21 +0300 |
commit | 9315561a79f8c08b28065daf027ef493ae27a6d2 (patch) | |
tree | 015bf2f66ecea81b37d6791fe2e8948e18e07186 /library/cpp/grpc | |
parent | 0dd632d6fc5676c75d7004172992cefaa2192db0 (diff) | |
download | ydb-9315561a79f8c08b28065daf027ef493ae27a6d2.tar.gz |
Restoring authorship annotation for Vladimir Gordiychuk <folyga@gmail.com>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/grpc')
-rw-r--r-- | library/cpp/grpc/client/grpc_client_low.cpp | 118 | ||||
-rw-r--r-- | library/cpp/grpc/client/grpc_client_low.h | 8 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_server.cpp | 90 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_server.h | 10 |
4 files changed, 113 insertions, 113 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp index 73cc908ef8..75f80767dd 100644 --- a/library/cpp/grpc/client/grpc_client_low.cpp +++ b/library/cpp/grpc/client/grpc_client_low.cpp @@ -5,8 +5,8 @@ #include <library/cpp/containers/stack_vector/stack_vec.h> #include <util/string/printf.h> -#include <util/system/thread.h> -#include <util/random/random.h> +#include <util/system/thread.h> +#include <util/random/random.h> #if !defined(_WIN32) && !defined(_WIN64) #include <sys/types.h> @@ -175,24 +175,24 @@ void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TStri LastUsedQueue_.erase(pos); } -static void PullEvents(grpc::CompletionQueue* cq) { - TThread::SetCurrentThreadName("grpc_client"); - while (true) { - void* tag; - bool ok; - - if (!cq->Next(&tag, &ok)) { - break; - } - - if (auto* ev = static_cast<IQueueClientEvent*>(tag)) { - if (!ev->Execute(ok)) { - ev->Destroy(); - } - } - } -} - +static void PullEvents(grpc::CompletionQueue* cq) { + TThread::SetCurrentThreadName("grpc_client"); + while (true) { + void* tag; + bool ok; + + if (!cq->Next(&tag, &ok)) { + break; + } + + if (auto* ev = static_cast<IQueueClientEvent*>(tag)) { + if (!ev->Execute(ok)) { + ev->Destroy(); + } + } + } +} + class TGRpcClientLow::TContextImpl final : public std::enable_shared_from_this<TContextImpl> , public IQueueClientContext @@ -249,7 +249,7 @@ public: // It's now safe to initialize parent and owner child->Parent = std::move(self); child->Owner = Owner; - child->CQ = CQ; + child->CQ = CQ; // Propagate cancellation to a child context if (Cancelled.load(std::memory_order_relaxed)) { @@ -262,7 +262,7 @@ public: grpc::CompletionQueue* CompletionQueue() override { Y_VERIFY(Owner, "Uninitialized context"); - return CQ; + return CQ; } bool IsCancelled() const override { @@ -388,7 +388,7 @@ private: // These fields are initialized on successful registration TContextPtr Parent; TGRpcClientLow* Owner = nullptr; - grpc::CompletionQueue* CQ = nullptr; + grpc::CompletionQueue* CQ = nullptr; // Some children are stored inline, others are in a set std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } }; @@ -401,46 +401,46 @@ private: std::atomic<bool> Cancelled; }; -TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread) - : UseCompletionQueuePerThread_(useCompletionQueuePerThread) -{ +TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread) + : UseCompletionQueuePerThread_(useCompletionQueuePerThread) +{ Init(numWorkerThread); } void TGRpcClientLow::Init(size_t numWorkerThread) { SetCqState(WORKING); - if (UseCompletionQueuePerThread_) { - for (size_t i = 0; i < numWorkerThread; i++) { - CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); - auto* cq = CQS_.back().get(); - WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { - PullEvents(cq); - }).Release()); - } - } else { - CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); - auto* cq = CQS_.back().get(); - for (size_t i = 0; i < numWorkerThread; i++) { - WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { - PullEvents(cq); - }).Release()); - } + if (UseCompletionQueuePerThread_) { + for (size_t i = 0; i < numWorkerThread; i++) { + CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); + auto* cq = CQS_.back().get(); + WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { + PullEvents(cq); + }).Release()); + } + } else { + CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); + auto* cq = CQS_.back().get(); + for (size_t i = 0; i < numWorkerThread; i++) { + WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { + PullEvents(cq); + }).Release()); + } } } void TGRpcClientLow::AddWorkerThreadForTest() { - if (UseCompletionQueuePerThread_) { - CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); - auto* cq = CQS_.back().get(); - WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { - PullEvents(cq); - }).Release()); - } else { - auto* cq = CQS_.back().get(); - WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { - PullEvents(cq); - }).Release()); - } + if (UseCompletionQueuePerThread_) { + CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); + auto* cq = CQS_.back().get(); + WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { + PullEvents(cq); + }).Release()); + } else { + auto* cq = CQS_.back().get(); + WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { + PullEvents(cq); + }).Release()); + } } TGRpcClientLow::~TGRpcClientLow() { @@ -503,9 +503,9 @@ void TGRpcClientLow::StopInternal(bool silent) { } if (shutdown) { - for (auto& cq : CQS_) { - cq->Shutdown(); - } + for (auto& cq : CQS_) { + cq->Shutdown(); + } } } @@ -577,8 +577,8 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) { if (shutdown) { // This was the last context, shutdown CQ - for (auto& cq : CQS_) { - cq->Shutdown(); + for (auto& cq : CQS_) { + cq->Shutdown(); } } } diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h index ab0a0627be..627c88ca67 100644 --- a/library/cpp/grpc/client/grpc_client_low.h +++ b/library/cpp/grpc/client/grpc_client_low.h @@ -1320,7 +1320,7 @@ private: }; class TGRpcClientLow - : public IQueueClientContextProvider + : public IQueueClientContextProvider { class TContextImpl; friend class TContextImpl; @@ -1332,7 +1332,7 @@ class TGRpcClientLow }; public: - explicit TGRpcClientLow(size_t numWorkerThread = DEFAULT_NUM_THREADS, bool useCompletionQueuePerThread = false); + explicit TGRpcClientLow(size_t numWorkerThread = DEFAULT_NUM_THREADS, bool useCompletionQueuePerThread = false); ~TGRpcClientLow(); // Tries to stop all currently running requests (via their stop callbacks) @@ -1372,7 +1372,7 @@ public: private: using IThreadRef = std::unique_ptr<IThreadFactory::IThread>; - using CompletionQueueRef = std::unique_ptr<grpc::CompletionQueue>; + using CompletionQueueRef = std::unique_ptr<grpc::CompletionQueue>; void Init(size_t numWorkerThread); inline ECqState GetCqState() const { return (ECqState) AtomicGet(CqState_); } @@ -1384,7 +1384,7 @@ private: void ForgetContext(TContextImpl* context); private: - bool UseCompletionQueuePerThread_; + bool UseCompletionQueuePerThread_; std::vector<CompletionQueueRef> CQS_; std::vector<IThreadRef> WorkerThreads_; TAtomic CqState_ = -1; diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp index 7437b7a8f5..5d72f74d29 100644 --- a/library/cpp/grpc/server/grpc_server.cpp +++ b/library/cpp/grpc/server/grpc_server.cpp @@ -19,24 +19,24 @@ namespace NGrpc { using NThreading::TFuture; -static void PullEvents(grpc::ServerCompletionQueue* cq) { - TThread::SetCurrentThreadName("grpc_server"); - while (true) { - void* tag; // uniquely identifies a request. - bool ok; - - if (cq->Next(&tag, &ok)) { - IQueueEvent* const ev(static_cast<IQueueEvent*>(tag)); - - if (!ev->Execute(ok)) { - ev->DestroyRequest(); - } - } else { - break; - } - } -} - +static void PullEvents(grpc::ServerCompletionQueue* cq) { + TThread::SetCurrentThreadName("grpc_server"); + while (true) { + void* tag; // uniquely identifies a request. + bool ok; + + if (cq->Next(&tag, &ok)) { + IQueueEvent* const ev(static_cast<IQueueEvent*>(tag)); + + if (!ev->Execute(ok)) { + ev->DestroyRequest(); + } + } else { + break; + } + } +} + TGRpcServer::TGRpcServer(const TServerOptions& opts) : Options_(opts) , Limiter_(Options_.MaxGlobalRequestInFlight) @@ -124,14 +124,14 @@ void TGRpcServer::Start() { builder.SetOption(std::make_unique<TKeepAliveOption>()); } - if (Options_.UseCompletionQueuePerThread) { - for (size_t i = 0; i < Options_.WorkerThreads; ++i) { - CQS_.push_back(builder.AddCompletionQueue()); - } - } else { - CQS_.push_back(builder.AddCompletionQueue()); - } - + if (Options_.UseCompletionQueuePerThread) { + for (size_t i = 0; i < Options_.WorkerThreads; ++i) { + CQS_.push_back(builder.AddCompletionQueue()); + } + } else { + CQS_.push_back(builder.AddCompletionQueue()); + } + if (Options_.GRpcMemoryQuotaBytes) { // See details KIKIMR-6932 /* @@ -149,27 +149,27 @@ void TGRpcServer::Start() { if (!Server_) { ythrow yexception() << "can't start grpc server on " << server_address; } - - size_t index = 0; + + size_t index = 0; for (IGRpcServicePtr service : Services_) { - // TODO: provide something else for services instead of ServerCompletionQueue + // TODO: provide something else for services instead of ServerCompletionQueue service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger); } - if (Options_.UseCompletionQueuePerThread) { - for (size_t i = 0; i < Options_.WorkerThreads; ++i) { - auto* cq = &CQS_[i]; - Ts.push_back(SystemThreadFactory()->Run([cq] { - PullEvents(cq->get()); - })); - } - } else { - for (size_t i = 0; i < Options_.WorkerThreads; ++i) { - auto* cq = &CQS_[0]; - Ts.push_back(SystemThreadFactory()->Run([cq] { - PullEvents(cq->get()); - })); - } + if (Options_.UseCompletionQueuePerThread) { + for (size_t i = 0; i < Options_.WorkerThreads; ++i) { + auto* cq = &CQS_[i]; + Ts.push_back(SystemThreadFactory()->Run([cq] { + PullEvents(cq->get()); + })); + } + } else { + for (size_t i = 0; i < Options_.WorkerThreads; ++i) { + auto* cq = &CQS_[0]; + Ts.push_back(SystemThreadFactory()->Run([cq] { + PullEvents(cq->get()); + })); + } } if (Options_.ExternalListener) { @@ -214,8 +214,8 @@ void TGRpcServer::Stop() { } // Always shutdown the completion queue after the server. - for (auto& cq : CQS_) { - cq->Shutdown(); + for (auto& cq : CQS_) { + cq->Shutdown(); } for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) { diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h index d6814a90a0..4d6626efb9 100644 --- a/library/cpp/grpc/server/grpc_server.h +++ b/library/cpp/grpc/server/grpc_server.h @@ -53,9 +53,9 @@ struct TServerOptions { //! Number of worker threads. DECLARE_FIELD(WorkerThreads, size_t, 2); - //! Create one completion queue per thread - DECLARE_FIELD(UseCompletionQueuePerThread, bool, false); - + //! Create one completion queue per thread + DECLARE_FIELD(UseCompletionQueuePerThread, bool, false); + //! Memory quota size for grpc server in bytes. Zero means unlimited. DECLARE_FIELD(GRpcMemoryQuotaBytes, size_t, 0); @@ -328,7 +328,7 @@ private: TAdaptiveLock Lock_; }; -class TGRpcServer { +class TGRpcServer { public: using IGRpcServicePtr = TIntrusivePtr<IGRpcService>; TGRpcServer(const TServerOptions& opts); @@ -346,7 +346,7 @@ private: const TServerOptions Options_; std::unique_ptr<grpc::Server> Server_; - std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_; + std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_; TVector<IThreadRef> Ts; TVector<IGRpcServicePtr> Services_; |