diff options
author | Alexey Borzenkov <snaury@yandex-team.ru> | 2022-02-10 16:47:43 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:43 +0300 |
commit | 330c83f8c116bd45316397b179275e9d87007e7d (patch) | |
tree | c0748b5dcbade83af788c0abfa89c0383d6b779c /library/cpp/grpc/client/grpc_client_low.cpp | |
parent | 22d92781ba2a10b7fb5b977b7d1a5c40ff53885f (diff) | |
download | ydb-330c83f8c116bd45316397b179275e9d87007e7d.tar.gz |
Restoring authorship annotation for Alexey Borzenkov <snaury@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/client/grpc_client_low.cpp')
-rw-r--r-- | library/cpp/grpc/client/grpc_client_low.cpp | 574 |
1 files changed, 287 insertions, 287 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp index bba83aee02..73cc908ef8 100644 --- a/library/cpp/grpc/client/grpc_client_low.cpp +++ b/library/cpp/grpc/client/grpc_client_low.cpp @@ -193,214 +193,214 @@ static void PullEvents(grpc::CompletionQueue* cq) { } } -class TGRpcClientLow::TContextImpl final - : public std::enable_shared_from_this<TContextImpl> - , public IQueueClientContext -{ - friend class TGRpcClientLow; - - using TCallback = std::function<void()>; - using TContextPtr = std::shared_ptr<TContextImpl>; - -public: +class TGRpcClientLow::TContextImpl final + : public std::enable_shared_from_this<TContextImpl> + , public IQueueClientContext +{ + friend class TGRpcClientLow; + + using TCallback = std::function<void()>; + using TContextPtr = std::shared_ptr<TContextImpl>; + +public: ~TContextImpl() override { - Y_VERIFY(CountChildren() == 0, - "Destructor called with non-empty children"); - - if (Parent) { - Parent->ForgetContext(this); - } else if (Y_LIKELY(Owner)) { - Owner->ForgetContext(this); - } - } - - /** - * Helper for locking child pointer from a parent container - */ - static TContextPtr LockChildPtr(TContextImpl* ptr) { - if (ptr) { - // N.B. it is safe to do as long as it's done under a mutex and - // pointer is among valid children. When that's the case we - // know that TContextImpl destructor has not finished yet, so - // the object is valid. The lock() method may return nullptr - // though, if the object is being destructed right now. - return ptr->weak_from_this().lock(); - } else { - return nullptr; - } - } - - void ForgetContext(TContextImpl* child) { - std::unique_lock<std::mutex> guard(Mutex); - - auto removed = RemoveChild(child); - Y_VERIFY(removed, "Unexpected ForgetContext(%p)", child); - } - - IQueueClientContextPtr CreateContext() override { - auto self = shared_from_this(); - auto child = std::make_shared<TContextImpl>(); - - { - std::unique_lock<std::mutex> guard(Mutex); - - AddChild(child.get()); - - // It's now safe to initialize parent and owner - child->Parent = std::move(self); - child->Owner = Owner; + Y_VERIFY(CountChildren() == 0, + "Destructor called with non-empty children"); + + if (Parent) { + Parent->ForgetContext(this); + } else if (Y_LIKELY(Owner)) { + Owner->ForgetContext(this); + } + } + + /** + * Helper for locking child pointer from a parent container + */ + static TContextPtr LockChildPtr(TContextImpl* ptr) { + if (ptr) { + // N.B. it is safe to do as long as it's done under a mutex and + // pointer is among valid children. When that's the case we + // know that TContextImpl destructor has not finished yet, so + // the object is valid. The lock() method may return nullptr + // though, if the object is being destructed right now. + return ptr->weak_from_this().lock(); + } else { + return nullptr; + } + } + + void ForgetContext(TContextImpl* child) { + std::unique_lock<std::mutex> guard(Mutex); + + auto removed = RemoveChild(child); + Y_VERIFY(removed, "Unexpected ForgetContext(%p)", child); + } + + IQueueClientContextPtr CreateContext() override { + auto self = shared_from_this(); + auto child = std::make_shared<TContextImpl>(); + + { + std::unique_lock<std::mutex> guard(Mutex); + + AddChild(child.get()); + + // It's now safe to initialize parent and owner + child->Parent = std::move(self); + child->Owner = Owner; child->CQ = CQ; - - // Propagate cancellation to a child context - if (Cancelled.load(std::memory_order_relaxed)) { - child->Cancelled.store(true, std::memory_order_relaxed); - } - } - - return child; - } - - grpc::CompletionQueue* CompletionQueue() override { - Y_VERIFY(Owner, "Uninitialized context"); + + // Propagate cancellation to a child context + if (Cancelled.load(std::memory_order_relaxed)) { + child->Cancelled.store(true, std::memory_order_relaxed); + } + } + + return child; + } + + grpc::CompletionQueue* CompletionQueue() override { + Y_VERIFY(Owner, "Uninitialized context"); return CQ; - } - - bool IsCancelled() const override { - return Cancelled.load(std::memory_order_acquire); - } - - bool Cancel() override { - TStackVec<TCallback, 1> callbacks; - TStackVec<TContextPtr, 2> children; - - { - std::unique_lock<std::mutex> guard(Mutex); - - if (Cancelled.load(std::memory_order_relaxed)) { - // Already cancelled in another thread - return false; - } - - callbacks.reserve(Callbacks.size()); - children.reserve(CountChildren()); - - for (auto& callback : Callbacks) { - callbacks.emplace_back().swap(callback); - } - Callbacks.clear(); - - // Collect all children we need to cancel - // N.B. we don't clear children links (cleared by destructors) - // N.B. some children may be stuck in destructors at the moment - for (TContextImpl* ptr : InlineChildren) { - if (auto child = LockChildPtr(ptr)) { - children.emplace_back(std::move(child)); - } - } - for (auto* ptr : Children) { - if (auto child = LockChildPtr(ptr)) { - children.emplace_back(std::move(child)); - } - } - - Cancelled.store(true, std::memory_order_release); - } - - // Call directly subscribed callbacks - if (callbacks) { - RunCallbacksNoExcept(callbacks); - } - - // Cancel all children - for (auto& child : children) { - child->Cancel(); - child.reset(); - } - - return true; - } - - void SubscribeCancel(TCallback callback) override { - Y_VERIFY(callback, "SubscribeCancel called with an empty callback"); - - { - std::unique_lock<std::mutex> guard(Mutex); - - if (!Cancelled.load(std::memory_order_relaxed)) { - Callbacks.emplace_back().swap(callback); - return; - } - } - - // Already cancelled, run immediately - callback(); - } - -private: - void AddChild(TContextImpl* child) { - for (TContextImpl*& slot : InlineChildren) { - if (!slot) { - slot = child; - return; - } - } - - Children.insert(child); - } - - bool RemoveChild(TContextImpl* child) { - for (TContextImpl*& slot : InlineChildren) { - if (slot == child) { - slot = nullptr; - return true; - } - } - - return Children.erase(child); - } - - size_t CountChildren() { - size_t count = 0; - - for (TContextImpl* ptr : InlineChildren) { - if (ptr) { - ++count; - } - } - - return count + Children.size(); - } - - template<class TCallbacks> - static void RunCallbacksNoExcept(TCallbacks& callbacks) noexcept { - for (auto& callback : callbacks) { - if (callback) { - callback(); - callback = nullptr; - } - } - } - -private: - // We want a simple lock here, without extra memory allocations - std::mutex Mutex; - - // These fields are initialized on successful registration - TContextPtr Parent; - TGRpcClientLow* Owner = nullptr; + } + + bool IsCancelled() const override { + return Cancelled.load(std::memory_order_acquire); + } + + bool Cancel() override { + TStackVec<TCallback, 1> callbacks; + TStackVec<TContextPtr, 2> children; + + { + std::unique_lock<std::mutex> guard(Mutex); + + if (Cancelled.load(std::memory_order_relaxed)) { + // Already cancelled in another thread + return false; + } + + callbacks.reserve(Callbacks.size()); + children.reserve(CountChildren()); + + for (auto& callback : Callbacks) { + callbacks.emplace_back().swap(callback); + } + Callbacks.clear(); + + // Collect all children we need to cancel + // N.B. we don't clear children links (cleared by destructors) + // N.B. some children may be stuck in destructors at the moment + for (TContextImpl* ptr : InlineChildren) { + if (auto child = LockChildPtr(ptr)) { + children.emplace_back(std::move(child)); + } + } + for (auto* ptr : Children) { + if (auto child = LockChildPtr(ptr)) { + children.emplace_back(std::move(child)); + } + } + + Cancelled.store(true, std::memory_order_release); + } + + // Call directly subscribed callbacks + if (callbacks) { + RunCallbacksNoExcept(callbacks); + } + + // Cancel all children + for (auto& child : children) { + child->Cancel(); + child.reset(); + } + + return true; + } + + void SubscribeCancel(TCallback callback) override { + Y_VERIFY(callback, "SubscribeCancel called with an empty callback"); + + { + std::unique_lock<std::mutex> guard(Mutex); + + if (!Cancelled.load(std::memory_order_relaxed)) { + Callbacks.emplace_back().swap(callback); + return; + } + } + + // Already cancelled, run immediately + callback(); + } + +private: + void AddChild(TContextImpl* child) { + for (TContextImpl*& slot : InlineChildren) { + if (!slot) { + slot = child; + return; + } + } + + Children.insert(child); + } + + bool RemoveChild(TContextImpl* child) { + for (TContextImpl*& slot : InlineChildren) { + if (slot == child) { + slot = nullptr; + return true; + } + } + + return Children.erase(child); + } + + size_t CountChildren() { + size_t count = 0; + + for (TContextImpl* ptr : InlineChildren) { + if (ptr) { + ++count; + } + } + + return count + Children.size(); + } + + template<class TCallbacks> + static void RunCallbacksNoExcept(TCallbacks& callbacks) noexcept { + for (auto& callback : callbacks) { + if (callback) { + callback(); + callback = nullptr; + } + } + } + +private: + // We want a simple lock here, without extra memory allocations + std::mutex Mutex; + + // These fields are initialized on successful registration + TContextPtr Parent; + TGRpcClientLow* Owner = nullptr; grpc::CompletionQueue* CQ = nullptr; - - // Some children are stored inline, others are in a set - std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } }; + + // Some children are stored inline, others are in a set + std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } }; std::unordered_set<TContextImpl*> Children; - - // Single callback is stored without extra allocations - TStackVec<TCallback, 1> Callbacks; - - // Atomic flag for a faster IsCancelled() implementation - std::atomic<bool> Cancelled; -}; - + + // Single callback is stored without extra allocations + TStackVec<TCallback, 1> Callbacks; + + // Atomic flag for a faster IsCancelled() implementation + std::atomic<bool> Cancelled; +}; + TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread) : UseCompletionQueuePerThread_(useCompletionQueuePerThread) { @@ -408,7 +408,7 @@ TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePe } void TGRpcClientLow::Init(size_t numWorkerThread) { - SetCqState(WORKING); + SetCqState(WORKING); if (UseCompletionQueuePerThread_) { for (size_t i = 0; i < numWorkerThread; i++) { CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); @@ -428,7 +428,7 @@ void TGRpcClientLow::Init(size_t numWorkerThread) { } } -void TGRpcClientLow::AddWorkerThreadForTest() { +void TGRpcClientLow::AddWorkerThreadForTest() { if (UseCompletionQueuePerThread_) { CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); auto* cq = CQS_.back().get(); @@ -441,75 +441,75 @@ void TGRpcClientLow::AddWorkerThreadForTest() { PullEvents(cq); }).Release()); } -} - -TGRpcClientLow::~TGRpcClientLow() { - StopInternal(true); - WaitInternal(); -} - -void TGRpcClientLow::Stop(bool wait) { - StopInternal(false); - - if (wait) { - WaitInternal(); +} + +TGRpcClientLow::~TGRpcClientLow() { + StopInternal(true); + WaitInternal(); +} + +void TGRpcClientLow::Stop(bool wait) { + StopInternal(false); + + if (wait) { + WaitInternal(); } } -void TGRpcClientLow::StopInternal(bool silent) { - bool shutdown; - - TVector<TContextImpl::TContextPtr> cancelQueue; - +void TGRpcClientLow::StopInternal(bool silent) { + bool shutdown; + + TVector<TContextImpl::TContextPtr> cancelQueue; + { std::unique_lock<std::mutex> guard(Mtx_); - auto allowStateChange = [&]() { - switch (GetCqState()) { - case WORKING: - return true; - case STOP_SILENT: - return !silent; - case STOP_EXPLICIT: - return false; - } - - Y_UNREACHABLE(); - }; - - if (!allowStateChange()) { - // Completion queue is already stopping - return; - } - - SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT); - + auto allowStateChange = [&]() { + switch (GetCqState()) { + case WORKING: + return true; + case STOP_SILENT: + return !silent; + case STOP_EXPLICIT: + return false; + } + + Y_UNREACHABLE(); + }; + + if (!allowStateChange()) { + // Completion queue is already stopping + return; + } + + SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT); + if (!silent && !Contexts_.empty()) { - cancelQueue.reserve(Contexts_.size()); - for (auto* ptr : Contexts_) { - // N.B. some contexts may be stuck in destructors - if (auto context = TContextImpl::LockChildPtr(ptr)) { - cancelQueue.emplace_back(std::move(context)); - } - } - } - + cancelQueue.reserve(Contexts_.size()); + for (auto* ptr : Contexts_) { + // N.B. some contexts may be stuck in destructors + if (auto context = TContextImpl::LockChildPtr(ptr)) { + cancelQueue.emplace_back(std::move(context)); + } + } + } + shutdown = Contexts_.empty(); - } - - for (auto& context : cancelQueue) { - context->Cancel(); - context.reset(); - } - - if (shutdown) { + } + + for (auto& context : cancelQueue) { + context->Cancel(); + context.reset(); + } + + if (shutdown) { for (auto& cq : CQS_) { cq->Shutdown(); } } -} - -void TGRpcClientLow::WaitInternal() { +} + +void TGRpcClientLow::WaitInternal() { std::unique_lock<std::mutex> guard(JoinMutex_); for (auto& ti : WorkerThreads_) { @@ -517,7 +517,7 @@ void TGRpcClientLow::WaitInternal() { } } -void TGRpcClientLow::WaitIdle() { +void TGRpcClientLow::WaitIdle() { std::unique_lock<std::mutex> guard(Mtx_); while (!Contexts_.empty()) { @@ -525,7 +525,7 @@ void TGRpcClientLow::WaitIdle() { } } -std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() { +std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() { std::unique_lock<std::mutex> guard(Mtx_); auto allowCreateContext = [&]() { @@ -535,15 +535,15 @@ std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() { case STOP_SILENT: case STOP_EXPLICIT: return false; - } - + } + Y_UNREACHABLE(); }; if (!allowCreateContext()) { // New context creation is forbidden return nullptr; - } + } auto context = std::make_shared<TContextImpl>(); Contexts_.insert(context.get()); @@ -554,32 +554,32 @@ std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() { context->CQ = CQS_[0].get(); } return context; -} - -void TGRpcClientLow::ForgetContext(TContextImpl* context) { - bool shutdown = false; - +} + +void TGRpcClientLow::ForgetContext(TContextImpl* context) { + bool shutdown = false; + { std::unique_lock<std::mutex> guard(Mtx_); - if (!Contexts_.erase(context)) { - Y_FAIL("Unexpected ForgetContext(%p)", context); - } - - if (Contexts_.empty()) { - if (IsStopping()) { - shutdown = true; - } - + if (!Contexts_.erase(context)) { + Y_FAIL("Unexpected ForgetContext(%p)", context); + } + + if (Contexts_.empty()) { + if (IsStopping()) { + shutdown = true; + } + ContextsEmpty_.notify_all(); - } - } - - if (shutdown) { - // This was the last context, shutdown CQ + } + } + + if (shutdown) { + // This was the last context, shutdown CQ for (auto& cq : CQS_) { cq->Shutdown(); - } + } } } |