aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/client/grpc_client_low.cpp
diff options
context:
space:
mode:
authorVladimir Gordiychuk <folyga@gmail.com>2022-02-10 16:50:21 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:21 +0300
commite7bf3caf59ff1d3936047c1800d0b9adaba5b647 (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc/client/grpc_client_low.cpp
parent9315561a79f8c08b28065daf027ef493ae27a6d2 (diff)
downloadydb-e7bf3caf59ff1d3936047c1800d0b9adaba5b647.tar.gz
Restoring authorship annotation for Vladimir Gordiychuk <folyga@gmail.com>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/client/grpc_client_low.cpp')
-rw-r--r--library/cpp/grpc/client/grpc_client_low.cpp118
1 files changed, 59 insertions, 59 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp
index 75f80767dd..73cc908ef8 100644
--- a/library/cpp/grpc/client/grpc_client_low.cpp
+++ b/library/cpp/grpc/client/grpc_client_low.cpp
@@ -5,8 +5,8 @@
#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <util/string/printf.h>
-#include <util/system/thread.h>
-#include <util/random/random.h>
+#include <util/system/thread.h>
+#include <util/random/random.h>
#if !defined(_WIN32) && !defined(_WIN64)
#include <sys/types.h>
@@ -175,24 +175,24 @@ void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TStri
LastUsedQueue_.erase(pos);
}
-static void PullEvents(grpc::CompletionQueue* cq) {
- TThread::SetCurrentThreadName("grpc_client");
- while (true) {
- void* tag;
- bool ok;
-
- if (!cq->Next(&tag, &ok)) {
- break;
- }
-
- if (auto* ev = static_cast<IQueueClientEvent*>(tag)) {
- if (!ev->Execute(ok)) {
- ev->Destroy();
- }
- }
- }
-}
-
+static void PullEvents(grpc::CompletionQueue* cq) {
+ TThread::SetCurrentThreadName("grpc_client");
+ while (true) {
+ void* tag;
+ bool ok;
+
+ if (!cq->Next(&tag, &ok)) {
+ break;
+ }
+
+ if (auto* ev = static_cast<IQueueClientEvent*>(tag)) {
+ if (!ev->Execute(ok)) {
+ ev->Destroy();
+ }
+ }
+ }
+}
+
class TGRpcClientLow::TContextImpl final
: public std::enable_shared_from_this<TContextImpl>
, public IQueueClientContext
@@ -249,7 +249,7 @@ public:
// It's now safe to initialize parent and owner
child->Parent = std::move(self);
child->Owner = Owner;
- child->CQ = CQ;
+ child->CQ = CQ;
// Propagate cancellation to a child context
if (Cancelled.load(std::memory_order_relaxed)) {
@@ -262,7 +262,7 @@ public:
grpc::CompletionQueue* CompletionQueue() override {
Y_VERIFY(Owner, "Uninitialized context");
- return CQ;
+ return CQ;
}
bool IsCancelled() const override {
@@ -388,7 +388,7 @@ private:
// These fields are initialized on successful registration
TContextPtr Parent;
TGRpcClientLow* Owner = nullptr;
- grpc::CompletionQueue* CQ = nullptr;
+ grpc::CompletionQueue* CQ = nullptr;
// Some children are stored inline, others are in a set
std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } };
@@ -401,46 +401,46 @@ private:
std::atomic<bool> Cancelled;
};
-TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread)
- : UseCompletionQueuePerThread_(useCompletionQueuePerThread)
-{
+TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread)
+ : UseCompletionQueuePerThread_(useCompletionQueuePerThread)
+{
Init(numWorkerThread);
}
void TGRpcClientLow::Init(size_t numWorkerThread) {
SetCqState(WORKING);
- if (UseCompletionQueuePerThread_) {
- for (size_t i = 0; i < numWorkerThread; i++) {
- CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
- auto* cq = CQS_.back().get();
- WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
- PullEvents(cq);
- }).Release());
- }
- } else {
- CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
- auto* cq = CQS_.back().get();
- for (size_t i = 0; i < numWorkerThread; i++) {
- WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
- PullEvents(cq);
- }).Release());
- }
+ if (UseCompletionQueuePerThread_) {
+ for (size_t i = 0; i < numWorkerThread; i++) {
+ CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
+ auto* cq = CQS_.back().get();
+ WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
+ PullEvents(cq);
+ }).Release());
+ }
+ } else {
+ CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
+ auto* cq = CQS_.back().get();
+ for (size_t i = 0; i < numWorkerThread; i++) {
+ WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
+ PullEvents(cq);
+ }).Release());
+ }
}
}
void TGRpcClientLow::AddWorkerThreadForTest() {
- if (UseCompletionQueuePerThread_) {
- CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
- auto* cq = CQS_.back().get();
- WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
- PullEvents(cq);
- }).Release());
- } else {
- auto* cq = CQS_.back().get();
- WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
- PullEvents(cq);
- }).Release());
- }
+ if (UseCompletionQueuePerThread_) {
+ CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
+ auto* cq = CQS_.back().get();
+ WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
+ PullEvents(cq);
+ }).Release());
+ } else {
+ auto* cq = CQS_.back().get();
+ WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
+ PullEvents(cq);
+ }).Release());
+ }
}
TGRpcClientLow::~TGRpcClientLow() {
@@ -503,9 +503,9 @@ void TGRpcClientLow::StopInternal(bool silent) {
}
if (shutdown) {
- for (auto& cq : CQS_) {
- cq->Shutdown();
- }
+ for (auto& cq : CQS_) {
+ cq->Shutdown();
+ }
}
}
@@ -577,8 +577,8 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) {
if (shutdown) {
// This was the last context, shutdown CQ
- for (auto& cq : CQS_) {
- cq->Shutdown();
+ for (auto& cq : CQS_) {
+ cq->Shutdown();
}
}
}