aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/client/grpc_client_low.cpp
diff options
context:
space:
mode:
authorAlexey Borzenkov <snaury@yandex-team.ru>2022-02-10 16:47:43 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:43 +0300
commit330c83f8c116bd45316397b179275e9d87007e7d (patch)
treec0748b5dcbade83af788c0abfa89c0383d6b779c /library/cpp/grpc/client/grpc_client_low.cpp
parent22d92781ba2a10b7fb5b977b7d1a5c40ff53885f (diff)
downloadydb-330c83f8c116bd45316397b179275e9d87007e7d.tar.gz
Restoring authorship annotation for Alexey Borzenkov <snaury@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/client/grpc_client_low.cpp')
-rw-r--r--library/cpp/grpc/client/grpc_client_low.cpp574
1 files changed, 287 insertions, 287 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp
index bba83aee02..73cc908ef8 100644
--- a/library/cpp/grpc/client/grpc_client_low.cpp
+++ b/library/cpp/grpc/client/grpc_client_low.cpp
@@ -193,214 +193,214 @@ static void PullEvents(grpc::CompletionQueue* cq) {
}
}
-class TGRpcClientLow::TContextImpl final
- : public std::enable_shared_from_this<TContextImpl>
- , public IQueueClientContext
-{
- friend class TGRpcClientLow;
-
- using TCallback = std::function<void()>;
- using TContextPtr = std::shared_ptr<TContextImpl>;
-
-public:
+class TGRpcClientLow::TContextImpl final
+ : public std::enable_shared_from_this<TContextImpl>
+ , public IQueueClientContext
+{
+ friend class TGRpcClientLow;
+
+ using TCallback = std::function<void()>;
+ using TContextPtr = std::shared_ptr<TContextImpl>;
+
+public:
~TContextImpl() override {
- Y_VERIFY(CountChildren() == 0,
- "Destructor called with non-empty children");
-
- if (Parent) {
- Parent->ForgetContext(this);
- } else if (Y_LIKELY(Owner)) {
- Owner->ForgetContext(this);
- }
- }
-
- /**
- * Helper for locking child pointer from a parent container
- */
- static TContextPtr LockChildPtr(TContextImpl* ptr) {
- if (ptr) {
- // N.B. it is safe to do as long as it's done under a mutex and
- // pointer is among valid children. When that's the case we
- // know that TContextImpl destructor has not finished yet, so
- // the object is valid. The lock() method may return nullptr
- // though, if the object is being destructed right now.
- return ptr->weak_from_this().lock();
- } else {
- return nullptr;
- }
- }
-
- void ForgetContext(TContextImpl* child) {
- std::unique_lock<std::mutex> guard(Mutex);
-
- auto removed = RemoveChild(child);
- Y_VERIFY(removed, "Unexpected ForgetContext(%p)", child);
- }
-
- IQueueClientContextPtr CreateContext() override {
- auto self = shared_from_this();
- auto child = std::make_shared<TContextImpl>();
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
-
- AddChild(child.get());
-
- // It's now safe to initialize parent and owner
- child->Parent = std::move(self);
- child->Owner = Owner;
+ Y_VERIFY(CountChildren() == 0,
+ "Destructor called with non-empty children");
+
+ if (Parent) {
+ Parent->ForgetContext(this);
+ } else if (Y_LIKELY(Owner)) {
+ Owner->ForgetContext(this);
+ }
+ }
+
+ /**
+ * Helper for locking child pointer from a parent container
+ */
+ static TContextPtr LockChildPtr(TContextImpl* ptr) {
+ if (ptr) {
+ // N.B. it is safe to do as long as it's done under a mutex and
+ // pointer is among valid children. When that's the case we
+ // know that TContextImpl destructor has not finished yet, so
+ // the object is valid. The lock() method may return nullptr
+ // though, if the object is being destructed right now.
+ return ptr->weak_from_this().lock();
+ } else {
+ return nullptr;
+ }
+ }
+
+ void ForgetContext(TContextImpl* child) {
+ std::unique_lock<std::mutex> guard(Mutex);
+
+ auto removed = RemoveChild(child);
+ Y_VERIFY(removed, "Unexpected ForgetContext(%p)", child);
+ }
+
+ IQueueClientContextPtr CreateContext() override {
+ auto self = shared_from_this();
+ auto child = std::make_shared<TContextImpl>();
+
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
+
+ AddChild(child.get());
+
+ // It's now safe to initialize parent and owner
+ child->Parent = std::move(self);
+ child->Owner = Owner;
child->CQ = CQ;
-
- // Propagate cancellation to a child context
- if (Cancelled.load(std::memory_order_relaxed)) {
- child->Cancelled.store(true, std::memory_order_relaxed);
- }
- }
-
- return child;
- }
-
- grpc::CompletionQueue* CompletionQueue() override {
- Y_VERIFY(Owner, "Uninitialized context");
+
+ // Propagate cancellation to a child context
+ if (Cancelled.load(std::memory_order_relaxed)) {
+ child->Cancelled.store(true, std::memory_order_relaxed);
+ }
+ }
+
+ return child;
+ }
+
+ grpc::CompletionQueue* CompletionQueue() override {
+ Y_VERIFY(Owner, "Uninitialized context");
return CQ;
- }
-
- bool IsCancelled() const override {
- return Cancelled.load(std::memory_order_acquire);
- }
-
- bool Cancel() override {
- TStackVec<TCallback, 1> callbacks;
- TStackVec<TContextPtr, 2> children;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
-
- if (Cancelled.load(std::memory_order_relaxed)) {
- // Already cancelled in another thread
- return false;
- }
-
- callbacks.reserve(Callbacks.size());
- children.reserve(CountChildren());
-
- for (auto& callback : Callbacks) {
- callbacks.emplace_back().swap(callback);
- }
- Callbacks.clear();
-
- // Collect all children we need to cancel
- // N.B. we don't clear children links (cleared by destructors)
- // N.B. some children may be stuck in destructors at the moment
- for (TContextImpl* ptr : InlineChildren) {
- if (auto child = LockChildPtr(ptr)) {
- children.emplace_back(std::move(child));
- }
- }
- for (auto* ptr : Children) {
- if (auto child = LockChildPtr(ptr)) {
- children.emplace_back(std::move(child));
- }
- }
-
- Cancelled.store(true, std::memory_order_release);
- }
-
- // Call directly subscribed callbacks
- if (callbacks) {
- RunCallbacksNoExcept(callbacks);
- }
-
- // Cancel all children
- for (auto& child : children) {
- child->Cancel();
- child.reset();
- }
-
- return true;
- }
-
- void SubscribeCancel(TCallback callback) override {
- Y_VERIFY(callback, "SubscribeCancel called with an empty callback");
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
-
- if (!Cancelled.load(std::memory_order_relaxed)) {
- Callbacks.emplace_back().swap(callback);
- return;
- }
- }
-
- // Already cancelled, run immediately
- callback();
- }
-
-private:
- void AddChild(TContextImpl* child) {
- for (TContextImpl*& slot : InlineChildren) {
- if (!slot) {
- slot = child;
- return;
- }
- }
-
- Children.insert(child);
- }
-
- bool RemoveChild(TContextImpl* child) {
- for (TContextImpl*& slot : InlineChildren) {
- if (slot == child) {
- slot = nullptr;
- return true;
- }
- }
-
- return Children.erase(child);
- }
-
- size_t CountChildren() {
- size_t count = 0;
-
- for (TContextImpl* ptr : InlineChildren) {
- if (ptr) {
- ++count;
- }
- }
-
- return count + Children.size();
- }
-
- template<class TCallbacks>
- static void RunCallbacksNoExcept(TCallbacks& callbacks) noexcept {
- for (auto& callback : callbacks) {
- if (callback) {
- callback();
- callback = nullptr;
- }
- }
- }
-
-private:
- // We want a simple lock here, without extra memory allocations
- std::mutex Mutex;
-
- // These fields are initialized on successful registration
- TContextPtr Parent;
- TGRpcClientLow* Owner = nullptr;
+ }
+
+ bool IsCancelled() const override {
+ return Cancelled.load(std::memory_order_acquire);
+ }
+
+ bool Cancel() override {
+ TStackVec<TCallback, 1> callbacks;
+ TStackVec<TContextPtr, 2> children;
+
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
+
+ if (Cancelled.load(std::memory_order_relaxed)) {
+ // Already cancelled in another thread
+ return false;
+ }
+
+ callbacks.reserve(Callbacks.size());
+ children.reserve(CountChildren());
+
+ for (auto& callback : Callbacks) {
+ callbacks.emplace_back().swap(callback);
+ }
+ Callbacks.clear();
+
+ // Collect all children we need to cancel
+ // N.B. we don't clear children links (cleared by destructors)
+ // N.B. some children may be stuck in destructors at the moment
+ for (TContextImpl* ptr : InlineChildren) {
+ if (auto child = LockChildPtr(ptr)) {
+ children.emplace_back(std::move(child));
+ }
+ }
+ for (auto* ptr : Children) {
+ if (auto child = LockChildPtr(ptr)) {
+ children.emplace_back(std::move(child));
+ }
+ }
+
+ Cancelled.store(true, std::memory_order_release);
+ }
+
+ // Call directly subscribed callbacks
+ if (callbacks) {
+ RunCallbacksNoExcept(callbacks);
+ }
+
+ // Cancel all children
+ for (auto& child : children) {
+ child->Cancel();
+ child.reset();
+ }
+
+ return true;
+ }
+
+ void SubscribeCancel(TCallback callback) override {
+ Y_VERIFY(callback, "SubscribeCancel called with an empty callback");
+
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
+
+ if (!Cancelled.load(std::memory_order_relaxed)) {
+ Callbacks.emplace_back().swap(callback);
+ return;
+ }
+ }
+
+ // Already cancelled, run immediately
+ callback();
+ }
+
+private:
+ void AddChild(TContextImpl* child) {
+ for (TContextImpl*& slot : InlineChildren) {
+ if (!slot) {
+ slot = child;
+ return;
+ }
+ }
+
+ Children.insert(child);
+ }
+
+ bool RemoveChild(TContextImpl* child) {
+ for (TContextImpl*& slot : InlineChildren) {
+ if (slot == child) {
+ slot = nullptr;
+ return true;
+ }
+ }
+
+ return Children.erase(child);
+ }
+
+ size_t CountChildren() {
+ size_t count = 0;
+
+ for (TContextImpl* ptr : InlineChildren) {
+ if (ptr) {
+ ++count;
+ }
+ }
+
+ return count + Children.size();
+ }
+
+ template<class TCallbacks>
+ static void RunCallbacksNoExcept(TCallbacks& callbacks) noexcept {
+ for (auto& callback : callbacks) {
+ if (callback) {
+ callback();
+ callback = nullptr;
+ }
+ }
+ }
+
+private:
+ // We want a simple lock here, without extra memory allocations
+ std::mutex Mutex;
+
+ // These fields are initialized on successful registration
+ TContextPtr Parent;
+ TGRpcClientLow* Owner = nullptr;
grpc::CompletionQueue* CQ = nullptr;
-
- // Some children are stored inline, others are in a set
- std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } };
+
+ // Some children are stored inline, others are in a set
+ std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } };
std::unordered_set<TContextImpl*> Children;
-
- // Single callback is stored without extra allocations
- TStackVec<TCallback, 1> Callbacks;
-
- // Atomic flag for a faster IsCancelled() implementation
- std::atomic<bool> Cancelled;
-};
-
+
+ // Single callback is stored without extra allocations
+ TStackVec<TCallback, 1> Callbacks;
+
+ // Atomic flag for a faster IsCancelled() implementation
+ std::atomic<bool> Cancelled;
+};
+
TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread)
: UseCompletionQueuePerThread_(useCompletionQueuePerThread)
{
@@ -408,7 +408,7 @@ TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePe
}
void TGRpcClientLow::Init(size_t numWorkerThread) {
- SetCqState(WORKING);
+ SetCqState(WORKING);
if (UseCompletionQueuePerThread_) {
for (size_t i = 0; i < numWorkerThread; i++) {
CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
@@ -428,7 +428,7 @@ void TGRpcClientLow::Init(size_t numWorkerThread) {
}
}
-void TGRpcClientLow::AddWorkerThreadForTest() {
+void TGRpcClientLow::AddWorkerThreadForTest() {
if (UseCompletionQueuePerThread_) {
CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
auto* cq = CQS_.back().get();
@@ -441,75 +441,75 @@ void TGRpcClientLow::AddWorkerThreadForTest() {
PullEvents(cq);
}).Release());
}
-}
-
-TGRpcClientLow::~TGRpcClientLow() {
- StopInternal(true);
- WaitInternal();
-}
-
-void TGRpcClientLow::Stop(bool wait) {
- StopInternal(false);
-
- if (wait) {
- WaitInternal();
+}
+
+TGRpcClientLow::~TGRpcClientLow() {
+ StopInternal(true);
+ WaitInternal();
+}
+
+void TGRpcClientLow::Stop(bool wait) {
+ StopInternal(false);
+
+ if (wait) {
+ WaitInternal();
}
}
-void TGRpcClientLow::StopInternal(bool silent) {
- bool shutdown;
-
- TVector<TContextImpl::TContextPtr> cancelQueue;
-
+void TGRpcClientLow::StopInternal(bool silent) {
+ bool shutdown;
+
+ TVector<TContextImpl::TContextPtr> cancelQueue;
+
{
std::unique_lock<std::mutex> guard(Mtx_);
- auto allowStateChange = [&]() {
- switch (GetCqState()) {
- case WORKING:
- return true;
- case STOP_SILENT:
- return !silent;
- case STOP_EXPLICIT:
- return false;
- }
-
- Y_UNREACHABLE();
- };
-
- if (!allowStateChange()) {
- // Completion queue is already stopping
- return;
- }
-
- SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT);
-
+ auto allowStateChange = [&]() {
+ switch (GetCqState()) {
+ case WORKING:
+ return true;
+ case STOP_SILENT:
+ return !silent;
+ case STOP_EXPLICIT:
+ return false;
+ }
+
+ Y_UNREACHABLE();
+ };
+
+ if (!allowStateChange()) {
+ // Completion queue is already stopping
+ return;
+ }
+
+ SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT);
+
if (!silent && !Contexts_.empty()) {
- cancelQueue.reserve(Contexts_.size());
- for (auto* ptr : Contexts_) {
- // N.B. some contexts may be stuck in destructors
- if (auto context = TContextImpl::LockChildPtr(ptr)) {
- cancelQueue.emplace_back(std::move(context));
- }
- }
- }
-
+ cancelQueue.reserve(Contexts_.size());
+ for (auto* ptr : Contexts_) {
+ // N.B. some contexts may be stuck in destructors
+ if (auto context = TContextImpl::LockChildPtr(ptr)) {
+ cancelQueue.emplace_back(std::move(context));
+ }
+ }
+ }
+
shutdown = Contexts_.empty();
- }
-
- for (auto& context : cancelQueue) {
- context->Cancel();
- context.reset();
- }
-
- if (shutdown) {
+ }
+
+ for (auto& context : cancelQueue) {
+ context->Cancel();
+ context.reset();
+ }
+
+ if (shutdown) {
for (auto& cq : CQS_) {
cq->Shutdown();
}
}
-}
-
-void TGRpcClientLow::WaitInternal() {
+}
+
+void TGRpcClientLow::WaitInternal() {
std::unique_lock<std::mutex> guard(JoinMutex_);
for (auto& ti : WorkerThreads_) {
@@ -517,7 +517,7 @@ void TGRpcClientLow::WaitInternal() {
}
}
-void TGRpcClientLow::WaitIdle() {
+void TGRpcClientLow::WaitIdle() {
std::unique_lock<std::mutex> guard(Mtx_);
while (!Contexts_.empty()) {
@@ -525,7 +525,7 @@ void TGRpcClientLow::WaitIdle() {
}
}
-std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() {
+std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() {
std::unique_lock<std::mutex> guard(Mtx_);
auto allowCreateContext = [&]() {
@@ -535,15 +535,15 @@ std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() {
case STOP_SILENT:
case STOP_EXPLICIT:
return false;
- }
-
+ }
+
Y_UNREACHABLE();
};
if (!allowCreateContext()) {
// New context creation is forbidden
return nullptr;
- }
+ }
auto context = std::make_shared<TContextImpl>();
Contexts_.insert(context.get());
@@ -554,32 +554,32 @@ std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() {
context->CQ = CQS_[0].get();
}
return context;
-}
-
-void TGRpcClientLow::ForgetContext(TContextImpl* context) {
- bool shutdown = false;
-
+}
+
+void TGRpcClientLow::ForgetContext(TContextImpl* context) {
+ bool shutdown = false;
+
{
std::unique_lock<std::mutex> guard(Mtx_);
- if (!Contexts_.erase(context)) {
- Y_FAIL("Unexpected ForgetContext(%p)", context);
- }
-
- if (Contexts_.empty()) {
- if (IsStopping()) {
- shutdown = true;
- }
-
+ if (!Contexts_.erase(context)) {
+ Y_FAIL("Unexpected ForgetContext(%p)", context);
+ }
+
+ if (Contexts_.empty()) {
+ if (IsStopping()) {
+ shutdown = true;
+ }
+
ContextsEmpty_.notify_all();
- }
- }
-
- if (shutdown) {
- // This was the last context, shutdown CQ
+ }
+ }
+
+ if (shutdown) {
+ // This was the last context, shutdown CQ
for (auto& cq : CQS_) {
cq->Shutdown();
- }
+ }
}
}