diff options
author | Vladimir Gordiychuk <folyga@gmail.com> | 2022-02-10 16:50:21 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:21 +0300 |
commit | e7bf3caf59ff1d3936047c1800d0b9adaba5b647 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc/client/grpc_client_low.cpp | |
parent | 9315561a79f8c08b28065daf027ef493ae27a6d2 (diff) | |
download | ydb-e7bf3caf59ff1d3936047c1800d0b9adaba5b647.tar.gz |
Restoring authorship annotation for Vladimir Gordiychuk <folyga@gmail.com>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/client/grpc_client_low.cpp')
-rw-r--r-- | library/cpp/grpc/client/grpc_client_low.cpp | 118 |
1 files changed, 59 insertions, 59 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp index 75f80767dd..73cc908ef8 100644 --- a/library/cpp/grpc/client/grpc_client_low.cpp +++ b/library/cpp/grpc/client/grpc_client_low.cpp @@ -5,8 +5,8 @@ #include <library/cpp/containers/stack_vector/stack_vec.h> #include <util/string/printf.h> -#include <util/system/thread.h> -#include <util/random/random.h> +#include <util/system/thread.h> +#include <util/random/random.h> #if !defined(_WIN32) && !defined(_WIN64) #include <sys/types.h> @@ -175,24 +175,24 @@ void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TStri LastUsedQueue_.erase(pos); } -static void PullEvents(grpc::CompletionQueue* cq) { - TThread::SetCurrentThreadName("grpc_client"); - while (true) { - void* tag; - bool ok; - - if (!cq->Next(&tag, &ok)) { - break; - } - - if (auto* ev = static_cast<IQueueClientEvent*>(tag)) { - if (!ev->Execute(ok)) { - ev->Destroy(); - } - } - } -} - +static void PullEvents(grpc::CompletionQueue* cq) { + TThread::SetCurrentThreadName("grpc_client"); + while (true) { + void* tag; + bool ok; + + if (!cq->Next(&tag, &ok)) { + break; + } + + if (auto* ev = static_cast<IQueueClientEvent*>(tag)) { + if (!ev->Execute(ok)) { + ev->Destroy(); + } + } + } +} + class TGRpcClientLow::TContextImpl final : public std::enable_shared_from_this<TContextImpl> , public IQueueClientContext @@ -249,7 +249,7 @@ public: // It's now safe to initialize parent and owner child->Parent = std::move(self); child->Owner = Owner; - child->CQ = CQ; + child->CQ = CQ; // Propagate cancellation to a child context if (Cancelled.load(std::memory_order_relaxed)) { @@ -262,7 +262,7 @@ public: grpc::CompletionQueue* CompletionQueue() override { Y_VERIFY(Owner, "Uninitialized context"); - return CQ; + return CQ; } bool IsCancelled() const override { @@ -388,7 +388,7 @@ private: // These fields are initialized on successful registration TContextPtr Parent; TGRpcClientLow* Owner = nullptr; - grpc::CompletionQueue* CQ = nullptr; + grpc::CompletionQueue* CQ = nullptr; // Some children are stored inline, others are in a set std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } }; @@ -401,46 +401,46 @@ private: std::atomic<bool> Cancelled; }; -TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread) - : UseCompletionQueuePerThread_(useCompletionQueuePerThread) -{ +TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread) + : UseCompletionQueuePerThread_(useCompletionQueuePerThread) +{ Init(numWorkerThread); } void TGRpcClientLow::Init(size_t numWorkerThread) { SetCqState(WORKING); - if (UseCompletionQueuePerThread_) { - for (size_t i = 0; i < numWorkerThread; i++) { - CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); - auto* cq = CQS_.back().get(); - WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { - PullEvents(cq); - }).Release()); - } - } else { - CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); - auto* cq = CQS_.back().get(); - for (size_t i = 0; i < numWorkerThread; i++) { - WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { - PullEvents(cq); - }).Release()); - } + if (UseCompletionQueuePerThread_) { + for (size_t i = 0; i < numWorkerThread; i++) { + CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); + auto* cq = CQS_.back().get(); + WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { + PullEvents(cq); + }).Release()); + } + } else { + CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); + auto* cq = CQS_.back().get(); + for (size_t i = 0; i < numWorkerThread; i++) { + WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { + PullEvents(cq); + }).Release()); + } } } void TGRpcClientLow::AddWorkerThreadForTest() { - if (UseCompletionQueuePerThread_) { - CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); - auto* cq = CQS_.back().get(); - WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { - PullEvents(cq); - }).Release()); - } else { - auto* cq = CQS_.back().get(); - WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { - PullEvents(cq); - }).Release()); - } + if (UseCompletionQueuePerThread_) { + CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); + auto* cq = CQS_.back().get(); + WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { + PullEvents(cq); + }).Release()); + } else { + auto* cq = CQS_.back().get(); + WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { + PullEvents(cq); + }).Release()); + } } TGRpcClientLow::~TGRpcClientLow() { @@ -503,9 +503,9 @@ void TGRpcClientLow::StopInternal(bool silent) { } if (shutdown) { - for (auto& cq : CQS_) { - cq->Shutdown(); - } + for (auto& cq : CQS_) { + cq->Shutdown(); + } } } @@ -577,8 +577,8 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) { if (shutdown) { // This was the last context, shutdown CQ - for (auto& cq : CQS_) { - cq->Shutdown(); + for (auto& cq : CQS_) { + cq->Shutdown(); } } } |