diff options
author | Alexey Borzenkov <snaury@yandex-team.ru> | 2022-02-10 16:47:41 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:41 +0300 |
commit | 22d92781ba2a10b7fb5b977b7d1a5c40ff53885f (patch) | |
tree | 852611fd27f734847435b37aa5b0ad5d8b1c10ac /library/cpp/grpc | |
parent | 667a4ee7da2e004784b9c3cfab824a81e96f4d66 (diff) | |
download | ydb-22d92781ba2a10b7fb5b977b7d1a5c40ff53885f.tar.gz |
Restoring authorship annotation for Alexey Borzenkov <snaury@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/grpc')
-rw-r--r-- | library/cpp/grpc/client/grpc_client_low.cpp | 574 | ||||
-rw-r--r-- | library/cpp/grpc/client/grpc_client_low.h | 1234 | ||||
-rw-r--r-- | library/cpp/grpc/client/grpc_common.h | 16 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_counters.h | 12 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_request.h | 56 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_request_base.h | 6 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_server.cpp | 12 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_server.h | 198 |
8 files changed, 1054 insertions, 1054 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp index 73cc908ef8..bba83aee02 100644 --- a/library/cpp/grpc/client/grpc_client_low.cpp +++ b/library/cpp/grpc/client/grpc_client_low.cpp @@ -193,214 +193,214 @@ static void PullEvents(grpc::CompletionQueue* cq) { } } -class TGRpcClientLow::TContextImpl final - : public std::enable_shared_from_this<TContextImpl> - , public IQueueClientContext -{ - friend class TGRpcClientLow; - - using TCallback = std::function<void()>; - using TContextPtr = std::shared_ptr<TContextImpl>; - -public: +class TGRpcClientLow::TContextImpl final + : public std::enable_shared_from_this<TContextImpl> + , public IQueueClientContext +{ + friend class TGRpcClientLow; + + using TCallback = std::function<void()>; + using TContextPtr = std::shared_ptr<TContextImpl>; + +public: ~TContextImpl() override { - Y_VERIFY(CountChildren() == 0, - "Destructor called with non-empty children"); - - if (Parent) { - Parent->ForgetContext(this); - } else if (Y_LIKELY(Owner)) { - Owner->ForgetContext(this); - } - } - - /** - * Helper for locking child pointer from a parent container - */ - static TContextPtr LockChildPtr(TContextImpl* ptr) { - if (ptr) { - // N.B. it is safe to do as long as it's done under a mutex and - // pointer is among valid children. When that's the case we - // know that TContextImpl destructor has not finished yet, so - // the object is valid. The lock() method may return nullptr - // though, if the object is being destructed right now. - return ptr->weak_from_this().lock(); - } else { - return nullptr; - } - } - - void ForgetContext(TContextImpl* child) { - std::unique_lock<std::mutex> guard(Mutex); - - auto removed = RemoveChild(child); - Y_VERIFY(removed, "Unexpected ForgetContext(%p)", child); - } - - IQueueClientContextPtr CreateContext() override { - auto self = shared_from_this(); - auto child = std::make_shared<TContextImpl>(); - - { - std::unique_lock<std::mutex> guard(Mutex); - - AddChild(child.get()); - - // It's now safe to initialize parent and owner - child->Parent = std::move(self); - child->Owner = Owner; + Y_VERIFY(CountChildren() == 0, + "Destructor called with non-empty children"); + + if (Parent) { + Parent->ForgetContext(this); + } else if (Y_LIKELY(Owner)) { + Owner->ForgetContext(this); + } + } + + /** + * Helper for locking child pointer from a parent container + */ + static TContextPtr LockChildPtr(TContextImpl* ptr) { + if (ptr) { + // N.B. it is safe to do as long as it's done under a mutex and + // pointer is among valid children. When that's the case we + // know that TContextImpl destructor has not finished yet, so + // the object is valid. The lock() method may return nullptr + // though, if the object is being destructed right now. + return ptr->weak_from_this().lock(); + } else { + return nullptr; + } + } + + void ForgetContext(TContextImpl* child) { + std::unique_lock<std::mutex> guard(Mutex); + + auto removed = RemoveChild(child); + Y_VERIFY(removed, "Unexpected ForgetContext(%p)", child); + } + + IQueueClientContextPtr CreateContext() override { + auto self = shared_from_this(); + auto child = std::make_shared<TContextImpl>(); + + { + std::unique_lock<std::mutex> guard(Mutex); + + AddChild(child.get()); + + // It's now safe to initialize parent and owner + child->Parent = std::move(self); + child->Owner = Owner; child->CQ = CQ; - - // Propagate cancellation to a child context - if (Cancelled.load(std::memory_order_relaxed)) { - child->Cancelled.store(true, std::memory_order_relaxed); - } - } - - return child; - } - - grpc::CompletionQueue* CompletionQueue() override { - Y_VERIFY(Owner, "Uninitialized context"); + + // Propagate cancellation to a child context + if (Cancelled.load(std::memory_order_relaxed)) { + child->Cancelled.store(true, std::memory_order_relaxed); + } + } + + return child; + } + + grpc::CompletionQueue* CompletionQueue() override { + Y_VERIFY(Owner, "Uninitialized context"); return CQ; - } - - bool IsCancelled() const override { - return Cancelled.load(std::memory_order_acquire); - } - - bool Cancel() override { - TStackVec<TCallback, 1> callbacks; - TStackVec<TContextPtr, 2> children; - - { - std::unique_lock<std::mutex> guard(Mutex); - - if (Cancelled.load(std::memory_order_relaxed)) { - // Already cancelled in another thread - return false; - } - - callbacks.reserve(Callbacks.size()); - children.reserve(CountChildren()); - - for (auto& callback : Callbacks) { - callbacks.emplace_back().swap(callback); - } - Callbacks.clear(); - - // Collect all children we need to cancel - // N.B. we don't clear children links (cleared by destructors) - // N.B. some children may be stuck in destructors at the moment - for (TContextImpl* ptr : InlineChildren) { - if (auto child = LockChildPtr(ptr)) { - children.emplace_back(std::move(child)); - } - } - for (auto* ptr : Children) { - if (auto child = LockChildPtr(ptr)) { - children.emplace_back(std::move(child)); - } - } - - Cancelled.store(true, std::memory_order_release); - } - - // Call directly subscribed callbacks - if (callbacks) { - RunCallbacksNoExcept(callbacks); - } - - // Cancel all children - for (auto& child : children) { - child->Cancel(); - child.reset(); - } - - return true; - } - - void SubscribeCancel(TCallback callback) override { - Y_VERIFY(callback, "SubscribeCancel called with an empty callback"); - - { - std::unique_lock<std::mutex> guard(Mutex); - - if (!Cancelled.load(std::memory_order_relaxed)) { - Callbacks.emplace_back().swap(callback); - return; - } - } - - // Already cancelled, run immediately - callback(); - } - -private: - void AddChild(TContextImpl* child) { - for (TContextImpl*& slot : InlineChildren) { - if (!slot) { - slot = child; - return; - } - } - - Children.insert(child); - } - - bool RemoveChild(TContextImpl* child) { - for (TContextImpl*& slot : InlineChildren) { - if (slot == child) { - slot = nullptr; - return true; - } - } - - return Children.erase(child); - } - - size_t CountChildren() { - size_t count = 0; - - for (TContextImpl* ptr : InlineChildren) { - if (ptr) { - ++count; - } - } - - return count + Children.size(); - } - - template<class TCallbacks> - static void RunCallbacksNoExcept(TCallbacks& callbacks) noexcept { - for (auto& callback : callbacks) { - if (callback) { - callback(); - callback = nullptr; - } - } - } - -private: - // We want a simple lock here, without extra memory allocations - std::mutex Mutex; - - // These fields are initialized on successful registration - TContextPtr Parent; - TGRpcClientLow* Owner = nullptr; + } + + bool IsCancelled() const override { + return Cancelled.load(std::memory_order_acquire); + } + + bool Cancel() override { + TStackVec<TCallback, 1> callbacks; + TStackVec<TContextPtr, 2> children; + + { + std::unique_lock<std::mutex> guard(Mutex); + + if (Cancelled.load(std::memory_order_relaxed)) { + // Already cancelled in another thread + return false; + } + + callbacks.reserve(Callbacks.size()); + children.reserve(CountChildren()); + + for (auto& callback : Callbacks) { + callbacks.emplace_back().swap(callback); + } + Callbacks.clear(); + + // Collect all children we need to cancel + // N.B. we don't clear children links (cleared by destructors) + // N.B. some children may be stuck in destructors at the moment + for (TContextImpl* ptr : InlineChildren) { + if (auto child = LockChildPtr(ptr)) { + children.emplace_back(std::move(child)); + } + } + for (auto* ptr : Children) { + if (auto child = LockChildPtr(ptr)) { + children.emplace_back(std::move(child)); + } + } + + Cancelled.store(true, std::memory_order_release); + } + + // Call directly subscribed callbacks + if (callbacks) { + RunCallbacksNoExcept(callbacks); + } + + // Cancel all children + for (auto& child : children) { + child->Cancel(); + child.reset(); + } + + return true; + } + + void SubscribeCancel(TCallback callback) override { + Y_VERIFY(callback, "SubscribeCancel called with an empty callback"); + + { + std::unique_lock<std::mutex> guard(Mutex); + + if (!Cancelled.load(std::memory_order_relaxed)) { + Callbacks.emplace_back().swap(callback); + return; + } + } + + // Already cancelled, run immediately + callback(); + } + +private: + void AddChild(TContextImpl* child) { + for (TContextImpl*& slot : InlineChildren) { + if (!slot) { + slot = child; + return; + } + } + + Children.insert(child); + } + + bool RemoveChild(TContextImpl* child) { + for (TContextImpl*& slot : InlineChildren) { + if (slot == child) { + slot = nullptr; + return true; + } + } + + return Children.erase(child); + } + + size_t CountChildren() { + size_t count = 0; + + for (TContextImpl* ptr : InlineChildren) { + if (ptr) { + ++count; + } + } + + return count + Children.size(); + } + + template<class TCallbacks> + static void RunCallbacksNoExcept(TCallbacks& callbacks) noexcept { + for (auto& callback : callbacks) { + if (callback) { + callback(); + callback = nullptr; + } + } + } + +private: + // We want a simple lock here, without extra memory allocations + std::mutex Mutex; + + // These fields are initialized on successful registration + TContextPtr Parent; + TGRpcClientLow* Owner = nullptr; grpc::CompletionQueue* CQ = nullptr; - - // Some children are stored inline, others are in a set - std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } }; + + // Some children are stored inline, others are in a set + std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } }; std::unordered_set<TContextImpl*> Children; - - // Single callback is stored without extra allocations - TStackVec<TCallback, 1> Callbacks; - - // Atomic flag for a faster IsCancelled() implementation - std::atomic<bool> Cancelled; -}; - + + // Single callback is stored without extra allocations + TStackVec<TCallback, 1> Callbacks; + + // Atomic flag for a faster IsCancelled() implementation + std::atomic<bool> Cancelled; +}; + TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread) : UseCompletionQueuePerThread_(useCompletionQueuePerThread) { @@ -408,7 +408,7 @@ TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePe } void TGRpcClientLow::Init(size_t numWorkerThread) { - SetCqState(WORKING); + SetCqState(WORKING); if (UseCompletionQueuePerThread_) { for (size_t i = 0; i < numWorkerThread; i++) { CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); @@ -428,7 +428,7 @@ void TGRpcClientLow::Init(size_t numWorkerThread) { } } -void TGRpcClientLow::AddWorkerThreadForTest() { +void TGRpcClientLow::AddWorkerThreadForTest() { if (UseCompletionQueuePerThread_) { CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); auto* cq = CQS_.back().get(); @@ -441,75 +441,75 @@ void TGRpcClientLow::AddWorkerThreadForTest() { PullEvents(cq); }).Release()); } -} - -TGRpcClientLow::~TGRpcClientLow() { - StopInternal(true); - WaitInternal(); -} - -void TGRpcClientLow::Stop(bool wait) { - StopInternal(false); - - if (wait) { - WaitInternal(); +} + +TGRpcClientLow::~TGRpcClientLow() { + StopInternal(true); + WaitInternal(); +} + +void TGRpcClientLow::Stop(bool wait) { + StopInternal(false); + + if (wait) { + WaitInternal(); } } -void TGRpcClientLow::StopInternal(bool silent) { - bool shutdown; - - TVector<TContextImpl::TContextPtr> cancelQueue; - +void TGRpcClientLow::StopInternal(bool silent) { + bool shutdown; + + TVector<TContextImpl::TContextPtr> cancelQueue; + { std::unique_lock<std::mutex> guard(Mtx_); - auto allowStateChange = [&]() { - switch (GetCqState()) { - case WORKING: - return true; - case STOP_SILENT: - return !silent; - case STOP_EXPLICIT: - return false; - } - - Y_UNREACHABLE(); - }; - - if (!allowStateChange()) { - // Completion queue is already stopping - return; - } - - SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT); - + auto allowStateChange = [&]() { + switch (GetCqState()) { + case WORKING: + return true; + case STOP_SILENT: + return !silent; + case STOP_EXPLICIT: + return false; + } + + Y_UNREACHABLE(); + }; + + if (!allowStateChange()) { + // Completion queue is already stopping + return; + } + + SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT); + if (!silent && !Contexts_.empty()) { - cancelQueue.reserve(Contexts_.size()); - for (auto* ptr : Contexts_) { - // N.B. some contexts may be stuck in destructors - if (auto context = TContextImpl::LockChildPtr(ptr)) { - cancelQueue.emplace_back(std::move(context)); - } - } - } - + cancelQueue.reserve(Contexts_.size()); + for (auto* ptr : Contexts_) { + // N.B. some contexts may be stuck in destructors + if (auto context = TContextImpl::LockChildPtr(ptr)) { + cancelQueue.emplace_back(std::move(context)); + } + } + } + shutdown = Contexts_.empty(); - } - - for (auto& context : cancelQueue) { - context->Cancel(); - context.reset(); - } - - if (shutdown) { + } + + for (auto& context : cancelQueue) { + context->Cancel(); + context.reset(); + } + + if (shutdown) { for (auto& cq : CQS_) { cq->Shutdown(); } } -} - -void TGRpcClientLow::WaitInternal() { +} + +void TGRpcClientLow::WaitInternal() { std::unique_lock<std::mutex> guard(JoinMutex_); for (auto& ti : WorkerThreads_) { @@ -517,7 +517,7 @@ void TGRpcClientLow::WaitInternal() { } } -void TGRpcClientLow::WaitIdle() { +void TGRpcClientLow::WaitIdle() { std::unique_lock<std::mutex> guard(Mtx_); while (!Contexts_.empty()) { @@ -525,7 +525,7 @@ void TGRpcClientLow::WaitIdle() { } } -std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() { +std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() { std::unique_lock<std::mutex> guard(Mtx_); auto allowCreateContext = [&]() { @@ -535,15 +535,15 @@ std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() { case STOP_SILENT: case STOP_EXPLICIT: return false; - } - + } + Y_UNREACHABLE(); }; if (!allowCreateContext()) { // New context creation is forbidden return nullptr; - } + } auto context = std::make_shared<TContextImpl>(); Contexts_.insert(context.get()); @@ -554,32 +554,32 @@ std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() { context->CQ = CQS_[0].get(); } return context; -} - -void TGRpcClientLow::ForgetContext(TContextImpl* context) { - bool shutdown = false; - +} + +void TGRpcClientLow::ForgetContext(TContextImpl* context) { + bool shutdown = false; + { std::unique_lock<std::mutex> guard(Mtx_); - if (!Contexts_.erase(context)) { - Y_FAIL("Unexpected ForgetContext(%p)", context); - } - - if (Contexts_.empty()) { - if (IsStopping()) { - shutdown = true; - } - + if (!Contexts_.erase(context)) { + Y_FAIL("Unexpected ForgetContext(%p)", context); + } + + if (Contexts_.empty()) { + if (IsStopping()) { + shutdown = true; + } + ContextsEmpty_.notify_all(); - } - } - - if (shutdown) { - // This was the last context, shutdown CQ + } + } + + if (shutdown) { + // This was the last context, shutdown CQ for (auto& cq : CQS_) { cq->Shutdown(); - } + } } } diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h index ab0a0627be..5d0983f804 100644 --- a/library/cpp/grpc/client/grpc_client_low.h +++ b/library/cpp/grpc/client/grpc_client_low.h @@ -52,125 +52,125 @@ public: virtual void Destroy() = 0; }; -// Implementation of IQueueClientEvent that reduces allocations -template<class TSelf> -class TQueueClientFixedEvent : private IQueueClientEvent { - using TCallback = void (TSelf::*)(bool); - -public: - TQueueClientFixedEvent(TSelf* self, TCallback callback) - : Self(self) - , Callback(callback) - { } - - IQueueClientEvent* Prepare() { - Self->Ref(); - return this; - } - -private: - bool Execute(bool ok) override { - ((*Self).*Callback)(ok); - return false; - } - - void Destroy() override { - Self->UnRef(); - } - -private: - TSelf* const Self; - TCallback const Callback; -}; - -class IQueueClientContext; -using IQueueClientContextPtr = std::shared_ptr<IQueueClientContext>; - -// Provider of IQueueClientContext instances -class IQueueClientContextProvider { -public: - virtual ~IQueueClientContextProvider() = default; - - virtual IQueueClientContextPtr CreateContext() = 0; -}; - -// Activity context for a low-level client -class IQueueClientContext : public IQueueClientContextProvider { -public: - virtual ~IQueueClientContext() = default; - - //! Returns CompletionQueue associated with the client - virtual grpc::CompletionQueue* CompletionQueue() = 0; - - //! Returns true if context has been cancelled - virtual bool IsCancelled() const = 0; - - //! Tries to cancel context, calling all registered callbacks - virtual bool Cancel() = 0; - - //! Subscribes callback to cancellation - // - // Note there's no way to unsubscribe, if subscription is temporary - // make sure you create a new context with CreateContext and release - // it as soon as it's no longer needed. - virtual void SubscribeCancel(std::function<void()> callback) = 0; - - //! Subscribes callback to cancellation - // - // This alias is for compatibility with older code. - void SubscribeStop(std::function<void()> callback) { - SubscribeCancel(std::move(callback)); - } -}; - +// Implementation of IQueueClientEvent that reduces allocations +template<class TSelf> +class TQueueClientFixedEvent : private IQueueClientEvent { + using TCallback = void (TSelf::*)(bool); + +public: + TQueueClientFixedEvent(TSelf* self, TCallback callback) + : Self(self) + , Callback(callback) + { } + + IQueueClientEvent* Prepare() { + Self->Ref(); + return this; + } + +private: + bool Execute(bool ok) override { + ((*Self).*Callback)(ok); + return false; + } + + void Destroy() override { + Self->UnRef(); + } + +private: + TSelf* const Self; + TCallback const Callback; +}; + +class IQueueClientContext; +using IQueueClientContextPtr = std::shared_ptr<IQueueClientContext>; + +// Provider of IQueueClientContext instances +class IQueueClientContextProvider { +public: + virtual ~IQueueClientContextProvider() = default; + + virtual IQueueClientContextPtr CreateContext() = 0; +}; + +// Activity context for a low-level client +class IQueueClientContext : public IQueueClientContextProvider { +public: + virtual ~IQueueClientContext() = default; + + //! Returns CompletionQueue associated with the client + virtual grpc::CompletionQueue* CompletionQueue() = 0; + + //! Returns true if context has been cancelled + virtual bool IsCancelled() const = 0; + + //! Tries to cancel context, calling all registered callbacks + virtual bool Cancel() = 0; + + //! Subscribes callback to cancellation + // + // Note there's no way to unsubscribe, if subscription is temporary + // make sure you create a new context with CreateContext and release + // it as soon as it's no longer needed. + virtual void SubscribeCancel(std::function<void()> callback) = 0; + + //! Subscribes callback to cancellation + // + // This alias is for compatibility with older code. + void SubscribeStop(std::function<void()> callback) { + SubscribeCancel(std::move(callback)); + } +}; + // Represents grpc status and error message string struct TGrpcStatus { - TString Msg; + TString Msg; TString Details; - int GRpcStatusCode; - bool InternalError; - - TGrpcStatus() - : GRpcStatusCode(grpc::StatusCode::OK) - , InternalError(false) - { } - - TGrpcStatus(TString msg, int statusCode, bool internalError) - : Msg(std::move(msg)) - , GRpcStatusCode(statusCode) - , InternalError(internalError) - { } - + int GRpcStatusCode; + bool InternalError; + + TGrpcStatus() + : GRpcStatusCode(grpc::StatusCode::OK) + , InternalError(false) + { } + + TGrpcStatus(TString msg, int statusCode, bool internalError) + : Msg(std::move(msg)) + , GRpcStatusCode(statusCode) + , InternalError(internalError) + { } + TGrpcStatus(grpc::StatusCode status, TString msg, TString details = {}) - : Msg(std::move(msg)) + : Msg(std::move(msg)) , Details(std::move(details)) - , GRpcStatusCode(status) - , InternalError(false) - { } - - TGrpcStatus(const grpc::Status& status) + , GRpcStatusCode(status) + , InternalError(false) + { } + + TGrpcStatus(const grpc::Status& status) : TGrpcStatus(status.error_code(), TString(status.error_message()), TString(status.error_details())) - { } - - TGrpcStatus& operator=(const grpc::Status& status) { - Msg = TString(status.error_message()); + { } + + TGrpcStatus& operator=(const grpc::Status& status) { + Msg = TString(status.error_message()); Details = TString(status.error_details()); - GRpcStatusCode = status.error_code(); - InternalError = false; - return *this; - } - - static TGrpcStatus Internal(TString msg) { - return { std::move(msg), -1, true }; - } - - bool Ok() const { - return !InternalError && GRpcStatusCode == grpc::StatusCode::OK; - } + GRpcStatusCode = status.error_code(); + InternalError = false; + return *this; + } + + static TGrpcStatus Internal(TString msg) { + return { std::move(msg), -1, true }; + } + + bool Ok() const { + return !InternalError && GRpcStatusCode == grpc::StatusCode::OK; + } }; bool inline IsGRpcStatusGood(const TGrpcStatus& status) { - return status.Ok(); + return status.Ok(); } // Response callback type - this callback will be called when request is finished @@ -241,9 +241,9 @@ public: { } ~TSimpleRequestProcessor() { - if (!Replied_ && Callback_) { - Callback_(TGrpcStatus::Internal("request left unhandled"), std::move(Reply_)); - Callback_ = nullptr; // free resources as early as possible + if (!Replied_ && Callback_) { + Callback_(TGrpcStatus::Internal("request left unhandled"), std::move(Reply_)); + Callback_ = nullptr; // free resources as early as possible } } @@ -251,53 +251,53 @@ public: { std::unique_lock<std::mutex> guard(Mutex_); LocalContext.reset(); - } - TGrpcStatus status; - if (ok) { + } + TGrpcStatus status; + if (ok) { status = Status; - } else { - status = TGrpcStatus::Internal("Unexpected error"); + } else { + status = TGrpcStatus::Internal("Unexpected error"); } Replied_ = true; Callback_(std::move(status), std::move(Reply_)); - Callback_ = nullptr; // free resources as early as possible + Callback_ = nullptr; // free resources as early as possible return false; } void Destroy() override { - UnRef(); + UnRef(); } private: - IQueueClientEvent* FinishedEvent() { - Ref(); - return this; + IQueueClientEvent* FinishedEvent() { + Ref(); + return this; } void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) { - auto context = provider->CreateContext(); - if (!context) { - Replied_ = true; - Callback_(TGrpcStatus(grpc::StatusCode::CANCELLED, "Client is shutting down"), std::move(Reply_)); - Callback_ = nullptr; - return; - } + auto context = provider->CreateContext(); + if (!context) { + Replied_ = true; + Callback_(TGrpcStatus(grpc::StatusCode::CANCELLED, "Client is shutting down"), std::move(Reply_)); + Callback_ = nullptr; + return; + } { std::unique_lock<std::mutex> guard(Mutex_); LocalContext = context; Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue()); Reader_->Finish(&Reply_, &Status, FinishedEvent()); - } - context->SubscribeStop([self = TPtr(this)] { - self->Stop(); - }); - } - - void Stop() { + } + context->SubscribeStop([self = TPtr(this)] { + self->Stop(); + }); + } + + void Stop() { Context.TryCancel(); - } - - TResponseCallback<TResponse> Callback_; + } + + TResponseCallback<TResponse> Callback_; TResponse Reply_; std::mutex Mutex_; TAsyncReaderPtr Reader_; @@ -387,49 +387,49 @@ private: template<class TResponse> class IStreamRequestReadProcessor : public TThrRefBase { -public: +public: using TPtr = TIntrusivePtr<IStreamRequestReadProcessor>; - using TReadCallback = std::function<void(TGrpcStatus&&)>; - - /** - * Asynchronously cancel the request - */ - virtual void Cancel() = 0; - - /** + using TReadCallback = std::function<void(TGrpcStatus&&)>; + + /** + * Asynchronously cancel the request + */ + virtual void Cancel() = 0; + + /** * Scheduled initial server metadata read from the stream */ virtual void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) = 0; /** - * Scheduled response read from the stream - * Callback will be called with the status if it failed - * Only one Read or Finish call may be active at a time - */ - virtual void Read(TResponse* response, TReadCallback callback) = 0; - - /** - * Stop reading and gracefully finish the stream - * Only one Read or Finish call may be active at a time - */ - virtual void Finish(TReadCallback callback) = 0; - - /** - * Additional callback to be called when stream has finished - */ - virtual void AddFinishedCallback(TReadCallback callback) = 0; -}; - + * Scheduled response read from the stream + * Callback will be called with the status if it failed + * Only one Read or Finish call may be active at a time + */ + virtual void Read(TResponse* response, TReadCallback callback) = 0; + + /** + * Stop reading and gracefully finish the stream + * Only one Read or Finish call may be active at a time + */ + virtual void Finish(TReadCallback callback) = 0; + + /** + * Additional callback to be called when stream has finished + */ + virtual void AddFinishedCallback(TReadCallback callback) = 0; +}; + template<class TRequest, class TResponse> class IStreamRequestReadWriteProcessor : public IStreamRequestReadProcessor<TResponse> { public: using TPtr = TIntrusivePtr<IStreamRequestReadWriteProcessor>; - using TWriteCallback = std::function<void(TGrpcStatus&&)>; + using TWriteCallback = std::function<void(TGrpcStatus&&)>; /** * Scheduled request write to the stream */ - virtual void Write(TRequest&& request, TWriteCallback callback = { }) = 0; + virtual void Write(TRequest&& request, TWriteCallback callback = { }) = 0; }; class TGRpcKeepAliveSocketMutator; @@ -548,7 +548,7 @@ public: { std::unique_lock<std::mutex> guard(Mutex); Cancelled = true; - if (Started && !ReadFinished) { + if (Started && !ReadFinished) { if (!ReadActive) { ReadFinished = true; } @@ -640,31 +640,31 @@ public: callback(std::move(status)); } - - void AddFinishedCallback(TReadCallback callback) override { - Y_VERIFY(callback, "Unexpected empty callback"); - - TGrpcStatus status; - + + void AddFinishedCallback(TReadCallback callback) override { + Y_VERIFY(callback, "Unexpected empty callback"); + + TGrpcStatus status; + { std::unique_lock<std::mutex> guard(Mutex); - if (!Finished) { - FinishedCallbacks.emplace_back().swap(callback); - return; - } - - if (FinishedOk) { - status = Status; - } else if (Cancelled) { - status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled"); - } else { - status = TGrpcStatus::Internal("Unexpected error"); - } - } - - callback(std::move(status)); - } - + if (!Finished) { + FinishedCallbacks.emplace_back().swap(callback); + return; + } + + if (FinishedOk) { + status = Status; + } else if (Cancelled) { + status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled"); + } else { + status = TGrpcStatus::Internal("Unexpected error"); + } + } + + callback(std::move(status)); + } + private: void Start(TStub& stub, const TRequest& request, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) { auto context = provider->CreateContext(); @@ -727,8 +727,8 @@ private: { std::unique_lock<std::mutex> guard(Mutex); - Started = true; - if (!ok || Cancelled) { + Started = true; + if (!ok || Cancelled) { ReadFinished = true; Stream->Finish(&Status, OnFinishedTag.Prepare()); return; @@ -743,7 +743,7 @@ private: void OnFinished(bool ok) { TGrpcStatus status; std::vector<TReadCallback> finishedCallbacks; - TReaderCallback startCallback; + TReaderCallback startCallback; TReadCallback readCallback; TReadCallback finishCallback; @@ -756,19 +756,19 @@ private: if (ok) { status = Status; - } else if (Cancelled) { - status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled"); + } else if (Cancelled) { + status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled"); } else { status = TGrpcStatus::Internal("Unexpected error"); } - finishedCallbacks.swap(FinishedCallbacks); - - if (Callback) { - Y_VERIFY(!ReadActive); - startCallback = std::move(Callback); - Callback = nullptr; - } else if (ReadActive) { + finishedCallbacks.swap(FinishedCallbacks); + + if (Callback) { + Y_VERIFY(!ReadActive); + startCallback = std::move(Callback); + Callback = nullptr; + } else if (ReadActive) { if (ReadCallback) { readCallback = std::move(ReadCallback); ReadCallback = nullptr; @@ -780,18 +780,18 @@ private: } } - for (auto& finishedCallback : finishedCallbacks) { - auto statusCopy = status; - finishedCallback(std::move(statusCopy)); - } - - if (startCallback) { - if (status.Ok()) { - status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure"); - } - startCallback(std::move(status), nullptr); - } else if (readCallback) { + for (auto& finishedCallback : finishedCallbacks) { + auto statusCopy = status; + finishedCallback(std::move(statusCopy)); + } + + if (startCallback) { if (status.Ok()) { + status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure"); + } + startCallback(std::move(status), nullptr); + } else if (readCallback) { + if (status.Ok()) { status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); } readCallback(std::move(status)); @@ -812,7 +812,7 @@ private: TReadCallback FinishCallback; std::vector<TReadCallback> FinishedCallbacks; std::unordered_multimap<TString, TString>* InitialMetadata = nullptr; - bool Started = false; + bool Started = false; bool HasInitialMetadata = false; bool ReadActive = false; bool ReadFinished = false; @@ -821,72 +821,72 @@ private: bool FinishedOk = false; }; -template<class TRequest, class TResponse> +template<class TRequest, class TResponse> using TStreamConnectedCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadWriteProcessor<TRequest, TResponse>::TPtr)>; - -template<class TStub, class TRequest, class TResponse> + +template<class TStub, class TRequest, class TResponse> class TStreamRequestReadWriteProcessor : public IStreamRequestReadWriteProcessor<TRequest, TResponse> , public TGRpcRequestProcessorCommon { -public: +public: using TSelf = TStreamRequestReadWriteProcessor; using TBase = IStreamRequestReadWriteProcessor<TRequest, TResponse>; - using TPtr = TIntrusivePtr<TSelf>; - using TConnectedCallback = TStreamConnectedCallback<TRequest, TResponse>; - using TReadCallback = typename TBase::TReadCallback; - using TWriteCallback = typename TBase::TWriteCallback; - using TAsyncReaderWriterPtr = std::unique_ptr<grpc::ClientAsyncReaderWriter<TRequest, TResponse>>; - using TAsyncRequest = TAsyncReaderWriterPtr (TStub::*)(grpc::ClientContext*, grpc::CompletionQueue*, void*); - + using TPtr = TIntrusivePtr<TSelf>; + using TConnectedCallback = TStreamConnectedCallback<TRequest, TResponse>; + using TReadCallback = typename TBase::TReadCallback; + using TWriteCallback = typename TBase::TWriteCallback; + using TAsyncReaderWriterPtr = std::unique_ptr<grpc::ClientAsyncReaderWriter<TRequest, TResponse>>; + using TAsyncRequest = TAsyncReaderWriterPtr (TStub::*)(grpc::ClientContext*, grpc::CompletionQueue*, void*); + explicit TStreamRequestReadWriteProcessor(TConnectedCallback&& callback) - : ConnectedCallback(std::move(callback)) - { - Y_VERIFY(ConnectedCallback, "Missing connected callback"); - } - - void Cancel() override { - Context.TryCancel(); - + : ConnectedCallback(std::move(callback)) + { + Y_VERIFY(ConnectedCallback, "Missing connected callback"); + } + + void Cancel() override { + Context.TryCancel(); + { std::unique_lock<std::mutex> guard(Mutex); - Cancelled = true; - if (Started && !(ReadFinished && WriteFinished)) { - if (!ReadActive) { - ReadFinished = true; - } - if (!WriteActive) { - WriteFinished = true; - } - if (ReadFinished && WriteFinished) { - Stream->Finish(&Status, OnFinishedTag.Prepare()); - } - } - } - } - - void Write(TRequest&& request, TWriteCallback callback) override { - TGrpcStatus status; - + Cancelled = true; + if (Started && !(ReadFinished && WriteFinished)) { + if (!ReadActive) { + ReadFinished = true; + } + if (!WriteActive) { + WriteFinished = true; + } + if (ReadFinished && WriteFinished) { + Stream->Finish(&Status, OnFinishedTag.Prepare()); + } + } + } + } + + void Write(TRequest&& request, TWriteCallback callback) override { + TGrpcStatus status; + { std::unique_lock<std::mutex> guard(Mutex); - if (Cancelled || ReadFinished || WriteFinished) { - status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped"); - } else if (WriteActive) { - auto& item = WriteQueue.emplace_back(); - item.Callback.swap(callback); - item.Request.Swap(&request); - } else { - WriteActive = true; - WriteCallback.swap(callback); - Stream->Write(request, OnWriteDoneTag.Prepare()); - } - } - - if (!status.Ok() && callback) { - callback(std::move(status)); - } - } - + if (Cancelled || ReadFinished || WriteFinished) { + status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped"); + } else if (WriteActive) { + auto& item = WriteQueue.emplace_back(); + item.Callback.swap(callback); + item.Request.Swap(&request); + } else { + WriteActive = true; + WriteCallback.swap(callback); + Stream->Write(request, OnWriteDoneTag.Prepare()); + } + } + + if (!status.Ok() && callback) { + callback(std::move(status)); + } + } + void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override { TGrpcStatus status; @@ -916,321 +916,321 @@ public: callback(std::move(status)); } - void Read(TResponse* message, TReadCallback callback) override { - TGrpcStatus status; - + void Read(TResponse* message, TReadCallback callback) override { + TGrpcStatus status; + { std::unique_lock<std::mutex> guard(Mutex); - Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); - if (!Finished) { - ReadActive = true; - ReadCallback = std::move(callback); - if (!ReadFinished) { - Stream->Read(message, OnReadDoneTag.Prepare()); - } - return; - } - if (FinishedOk) { - status = Status; - } else { - status = TGrpcStatus::Internal("Unexpected error"); - } - } - - if (status.Ok()) { - status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); - } - - callback(std::move(status)); - } - - void Finish(TReadCallback callback) override { - TGrpcStatus status; - + Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); + if (!Finished) { + ReadActive = true; + ReadCallback = std::move(callback); + if (!ReadFinished) { + Stream->Read(message, OnReadDoneTag.Prepare()); + } + return; + } + if (FinishedOk) { + status = Status; + } else { + status = TGrpcStatus::Internal("Unexpected error"); + } + } + + if (status.Ok()) { + status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); + } + + callback(std::move(status)); + } + + void Finish(TReadCallback callback) override { + TGrpcStatus status; + { std::unique_lock<std::mutex> guard(Mutex); - Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); - if (!Finished) { - ReadActive = true; - FinishCallback = std::move(callback); - if (!ReadFinished) { - ReadFinished = true; - if (!WriteActive) { - WriteFinished = true; - } - if (WriteFinished) { - Stream->Finish(&Status, OnFinishedTag.Prepare()); - } - } - return; - } - if (FinishedOk) { - status = Status; - } else { - status = TGrpcStatus::Internal("Unexpected error"); - } - } - - callback(std::move(status)); - } - - void AddFinishedCallback(TReadCallback callback) override { - Y_VERIFY(callback, "Unexpected empty callback"); - - TGrpcStatus status; - + Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); + if (!Finished) { + ReadActive = true; + FinishCallback = std::move(callback); + if (!ReadFinished) { + ReadFinished = true; + if (!WriteActive) { + WriteFinished = true; + } + if (WriteFinished) { + Stream->Finish(&Status, OnFinishedTag.Prepare()); + } + } + return; + } + if (FinishedOk) { + status = Status; + } else { + status = TGrpcStatus::Internal("Unexpected error"); + } + } + + callback(std::move(status)); + } + + void AddFinishedCallback(TReadCallback callback) override { + Y_VERIFY(callback, "Unexpected empty callback"); + + TGrpcStatus status; + { std::unique_lock<std::mutex> guard(Mutex); - if (!Finished) { - FinishedCallbacks.emplace_back().swap(callback); - return; - } - - if (FinishedOk) { - status = Status; - } else if (Cancelled) { - status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled"); - } else { - status = TGrpcStatus::Internal("Unexpected error"); - } - } - - callback(std::move(status)); - } - -private: - template<typename> friend class TServiceConnection; - - void Start(TStub& stub, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) { - auto context = provider->CreateContext(); - if (!context) { - auto callback = std::move(ConnectedCallback); - TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down"); - callback(std::move(status), nullptr); - return; - } - + if (!Finished) { + FinishedCallbacks.emplace_back().swap(callback); + return; + } + + if (FinishedOk) { + status = Status; + } else if (Cancelled) { + status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled"); + } else { + status = TGrpcStatus::Internal("Unexpected error"); + } + } + + callback(std::move(status)); + } + +private: + template<typename> friend class TServiceConnection; + + void Start(TStub& stub, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) { + auto context = provider->CreateContext(); + if (!context) { + auto callback = std::move(ConnectedCallback); + TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down"); + callback(std::move(status), nullptr); + return; + } + { std::unique_lock<std::mutex> guard(Mutex); - LocalContext = context; - Stream = (stub.*asyncRequest)(&Context, context->CompletionQueue(), OnConnectedTag.Prepare()); - } - - context->SubscribeStop([self = TPtr(this)] { - self->Cancel(); - }); - } - -private: - void OnConnected(bool ok) { - TConnectedCallback callback; - + LocalContext = context; + Stream = (stub.*asyncRequest)(&Context, context->CompletionQueue(), OnConnectedTag.Prepare()); + } + + context->SubscribeStop([self = TPtr(this)] { + self->Cancel(); + }); + } + +private: + void OnConnected(bool ok) { + TConnectedCallback callback; + { std::unique_lock<std::mutex> guard(Mutex); - Started = true; - if (!ok || Cancelled) { - ReadFinished = true; - WriteFinished = true; - Stream->Finish(&Status, OnFinishedTag.Prepare()); - return; - } - - callback = std::move(ConnectedCallback); - ConnectedCallback = nullptr; - } - - callback({ }, typename TBase::TPtr(this)); - } - - void OnReadDone(bool ok) { - TGrpcStatus status; - TReadCallback callback; + Started = true; + if (!ok || Cancelled) { + ReadFinished = true; + WriteFinished = true; + Stream->Finish(&Status, OnFinishedTag.Prepare()); + return; + } + + callback = std::move(ConnectedCallback); + ConnectedCallback = nullptr; + } + + callback({ }, typename TBase::TPtr(this)); + } + + void OnReadDone(bool ok) { + TGrpcStatus status; + TReadCallback callback; std::unordered_multimap<TString, TString>* initialMetadata = nullptr; - + { std::unique_lock<std::mutex> guard(Mutex); - Y_VERIFY(ReadActive, "Unexpected Read done callback"); - Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag"); - - if (!ok || Cancelled || WriteFinished) { - ReadFinished = true; - if (!WriteActive) { - WriteFinished = true; - } - if (WriteFinished) { - Stream->Finish(&Status, OnFinishedTag.Prepare()); - } - if (!ok) { - // Keep ReadActive=true, so callback is called - // after the call is finished with an error - return; - } - } - - callback = std::move(ReadCallback); - ReadCallback = nullptr; - ReadActive = false; + Y_VERIFY(ReadActive, "Unexpected Read done callback"); + Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag"); + + if (!ok || Cancelled || WriteFinished) { + ReadFinished = true; + if (!WriteActive) { + WriteFinished = true; + } + if (WriteFinished) { + Stream->Finish(&Status, OnFinishedTag.Prepare()); + } + if (!ok) { + // Keep ReadActive=true, so callback is called + // after the call is finished with an error + return; + } + } + + callback = std::move(ReadCallback); + ReadCallback = nullptr; + ReadActive = false; initialMetadata = InitialMetadata; InitialMetadata = nullptr; HasInitialMetadata = true; - } - + } + if (initialMetadata) { GetInitialMetadata(initialMetadata); } - callback(std::move(status)); - } - - void OnWriteDone(bool ok) { - TWriteCallback okCallback; - + callback(std::move(status)); + } + + void OnWriteDone(bool ok) { + TWriteCallback okCallback; + { std::unique_lock<std::mutex> guard(Mutex); - Y_VERIFY(WriteActive, "Unexpected Write done callback"); - Y_VERIFY(!WriteFinished, "Unexpected WriteFinished flag"); - - if (ok) { - okCallback.swap(WriteCallback); - } else if (WriteCallback) { - // Put callback back on the queue until OnFinished - auto& item = WriteQueue.emplace_front(); - item.Callback.swap(WriteCallback); - } - - if (!ok || Cancelled) { - WriteActive = false; - WriteFinished = true; - if (!ReadActive) { - ReadFinished = true; - } - if (ReadFinished) { - Stream->Finish(&Status, OnFinishedTag.Prepare()); - } + Y_VERIFY(WriteActive, "Unexpected Write done callback"); + Y_VERIFY(!WriteFinished, "Unexpected WriteFinished flag"); + + if (ok) { + okCallback.swap(WriteCallback); + } else if (WriteCallback) { + // Put callback back on the queue until OnFinished + auto& item = WriteQueue.emplace_front(); + item.Callback.swap(WriteCallback); + } + + if (!ok || Cancelled) { + WriteActive = false; + WriteFinished = true; + if (!ReadActive) { + ReadFinished = true; + } + if (ReadFinished) { + Stream->Finish(&Status, OnFinishedTag.Prepare()); + } } else if (!WriteQueue.empty()) { - WriteCallback.swap(WriteQueue.front().Callback); - Stream->Write(WriteQueue.front().Request, OnWriteDoneTag.Prepare()); - WriteQueue.pop_front(); - } else { - WriteActive = false; - if (ReadFinished) { - WriteFinished = true; - Stream->Finish(&Status, OnFinishedTag.Prepare()); - } - } - } - - if (okCallback) { - okCallback(TGrpcStatus()); - } - } - - void OnFinished(bool ok) { - TGrpcStatus status; + WriteCallback.swap(WriteQueue.front().Callback); + Stream->Write(WriteQueue.front().Request, OnWriteDoneTag.Prepare()); + WriteQueue.pop_front(); + } else { + WriteActive = false; + if (ReadFinished) { + WriteFinished = true; + Stream->Finish(&Status, OnFinishedTag.Prepare()); + } + } + } + + if (okCallback) { + okCallback(TGrpcStatus()); + } + } + + void OnFinished(bool ok) { + TGrpcStatus status; std::deque<TWriteItem> writesDropped; std::vector<TReadCallback> finishedCallbacks; - TConnectedCallback connectedCallback; - TReadCallback readCallback; - TReadCallback finishCallback; - + TConnectedCallback connectedCallback; + TReadCallback readCallback; + TReadCallback finishCallback; + { std::unique_lock<std::mutex> guard(Mutex); - Finished = true; - FinishedOk = ok; - LocalContext.reset(); - - if (ok) { - status = Status; - } else if (Cancelled) { - status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled"); - } else { - status = TGrpcStatus::Internal("Unexpected error"); - } - - writesDropped.swap(WriteQueue); - finishedCallbacks.swap(FinishedCallbacks); - - if (ConnectedCallback) { - Y_VERIFY(!ReadActive); - connectedCallback = std::move(ConnectedCallback); - ConnectedCallback = nullptr; - } else if (ReadActive) { - if (ReadCallback) { - readCallback = std::move(ReadCallback); - ReadCallback = nullptr; - } else { - finishCallback = std::move(FinishCallback); - FinishCallback = nullptr; - } - ReadActive = false; - } - } - - for (auto& item : writesDropped) { - if (item.Callback) { - TGrpcStatus writeStatus = status; - if (writeStatus.Ok()) { - writeStatus = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped"); - } - item.Callback(std::move(writeStatus)); - } - } - - for (auto& finishedCallback : finishedCallbacks) { - TGrpcStatus statusCopy = status; - finishedCallback(std::move(statusCopy)); - } - - if (connectedCallback) { - if (status.Ok()) { - status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure"); - } - connectedCallback(std::move(status), nullptr); - } else if (readCallback) { - if (status.Ok()) { - status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); - } - readCallback(std::move(status)); - } else if (finishCallback) { - finishCallback(std::move(status)); - } - } - -private: - struct TWriteItem { - TWriteCallback Callback; - TRequest Request; - }; - -private: - using TFixedEvent = TQueueClientFixedEvent<TSelf>; - - TFixedEvent OnConnectedTag = { this, &TSelf::OnConnected }; - TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone }; - TFixedEvent OnWriteDoneTag = { this, &TSelf::OnWriteDone }; - TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished }; - -private: + Finished = true; + FinishedOk = ok; + LocalContext.reset(); + + if (ok) { + status = Status; + } else if (Cancelled) { + status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled"); + } else { + status = TGrpcStatus::Internal("Unexpected error"); + } + + writesDropped.swap(WriteQueue); + finishedCallbacks.swap(FinishedCallbacks); + + if (ConnectedCallback) { + Y_VERIFY(!ReadActive); + connectedCallback = std::move(ConnectedCallback); + ConnectedCallback = nullptr; + } else if (ReadActive) { + if (ReadCallback) { + readCallback = std::move(ReadCallback); + ReadCallback = nullptr; + } else { + finishCallback = std::move(FinishCallback); + FinishCallback = nullptr; + } + ReadActive = false; + } + } + + for (auto& item : writesDropped) { + if (item.Callback) { + TGrpcStatus writeStatus = status; + if (writeStatus.Ok()) { + writeStatus = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped"); + } + item.Callback(std::move(writeStatus)); + } + } + + for (auto& finishedCallback : finishedCallbacks) { + TGrpcStatus statusCopy = status; + finishedCallback(std::move(statusCopy)); + } + + if (connectedCallback) { + if (status.Ok()) { + status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure"); + } + connectedCallback(std::move(status), nullptr); + } else if (readCallback) { + if (status.Ok()) { + status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); + } + readCallback(std::move(status)); + } else if (finishCallback) { + finishCallback(std::move(status)); + } + } + +private: + struct TWriteItem { + TWriteCallback Callback; + TRequest Request; + }; + +private: + using TFixedEvent = TQueueClientFixedEvent<TSelf>; + + TFixedEvent OnConnectedTag = { this, &TSelf::OnConnected }; + TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone }; + TFixedEvent OnWriteDoneTag = { this, &TSelf::OnWriteDone }; + TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished }; + +private: std::mutex Mutex; - TAsyncReaderWriterPtr Stream; - TConnectedCallback ConnectedCallback; - TReadCallback ReadCallback; - TReadCallback FinishCallback; + TAsyncReaderWriterPtr Stream; + TConnectedCallback ConnectedCallback; + TReadCallback ReadCallback; + TReadCallback FinishCallback; std::vector<TReadCallback> FinishedCallbacks; std::deque<TWriteItem> WriteQueue; - TWriteCallback WriteCallback; + TWriteCallback WriteCallback; std::unordered_multimap<TString, TString>* InitialMetadata = nullptr; - bool Started = false; + bool Started = false; bool HasInitialMetadata = false; - bool ReadActive = false; - bool ReadFinished = false; - bool WriteActive = false; - bool WriteFinished = false; - bool Finished = false; - bool Cancelled = false; - bool FinishedOk = false; -}; - + bool ReadActive = false; + bool ReadFinished = false; + bool WriteActive = false; + bool WriteFinished = false; + bool Finished = false; + bool Cancelled = false; + bool FinishedOk = false; +}; + class TGRpcClientLow; template<typename TGRpcService> @@ -1245,9 +1245,9 @@ public: template<typename TRequest, typename TResponse> void DoRequest(const TRequest& request, TResponseCallback<TResponse> callback, - typename TSimpleRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, + typename TSimpleRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, const TCallMeta& metas = { }, - IQueueClientContextProvider* provider = nullptr) + IQueueClientContextProvider* provider = nullptr) { auto processor = MakeIntrusive<TSimpleRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback)); processor->ApplyMeta(metas); @@ -1282,31 +1282,31 @@ public: processor->ApplyMeta(metas); processor->Start(*Stub_, std::move(asyncRequest), provider ? provider : Provider_); } - + /* * Start streaming response reading (one request, many responses) */ - template<typename TRequest, typename TResponse> + template<typename TRequest, typename TResponse> void DoStreamRequest(const TRequest& request, TStreamReaderCallback<TResponse> callback, typename TStreamRequestReadProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, - const TCallMeta& metas = { }, - IQueueClientContextProvider* provider = nullptr) - { + const TCallMeta& metas = { }, + IQueueClientContextProvider* provider = nullptr) + { auto processor = MakeIntrusive<TStreamRequestReadProcessor<TStub, TRequest, TResponse>>(std::move(callback)); - processor->ApplyMeta(metas); + processor->ApplyMeta(metas); processor->Start(*Stub_, request, std::move(asyncRequest), provider ? provider : Provider_); - } - + } + private: TServiceConnection(std::shared_ptr<grpc::ChannelInterface> ci, - IQueueClientContextProvider* provider) + IQueueClientContextProvider* provider) : Stub_(TGRpcService::NewStub(ci)) - , Provider_(provider) - { - Y_VERIFY(Provider_, "Connection does not have a queue provider"); - } - + , Provider_(provider) + { + Y_VERIFY(Provider_, "Connection does not have a queue provider"); + } + TServiceConnection(TStubsHolder& holder, IQueueClientContextProvider* provider) : Stub_(holder.GetOrCreateStub<TStub>()) @@ -1316,47 +1316,47 @@ private: } std::shared_ptr<TStub> Stub_; - IQueueClientContextProvider* Provider_; + IQueueClientContextProvider* Provider_; }; class TGRpcClientLow : public IQueueClientContextProvider { - class TContextImpl; - friend class TContextImpl; - - enum ECqState : TAtomicBase { - WORKING = 0, - STOP_SILENT = 1, - STOP_EXPLICIT = 2, - }; - + class TContextImpl; + friend class TContextImpl; + + enum ECqState : TAtomicBase { + WORKING = 0, + STOP_SILENT = 1, + STOP_EXPLICIT = 2, + }; + public: explicit TGRpcClientLow(size_t numWorkerThread = DEFAULT_NUM_THREADS, bool useCompletionQueuePerThread = false); ~TGRpcClientLow(); - // Tries to stop all currently running requests (via their stop callbacks) - // Will shutdown CQ and drain events once all requests have finished - // No new requests may be started after this call - void Stop(bool wait = false); - - // Waits until all currently running requests finish execution - void WaitIdle(); - - inline bool IsStopping() const { - switch (GetCqState()) { - case WORKING: - return false; - case STOP_SILENT: - case STOP_EXPLICIT: - return true; - } - - Y_UNREACHABLE(); - } - - IQueueClientContextPtr CreateContext() override; - + // Tries to stop all currently running requests (via their stop callbacks) + // Will shutdown CQ and drain events once all requests have finished + // No new requests may be started after this call + void Stop(bool wait = false); + + // Waits until all currently running requests finish execution + void WaitIdle(); + + inline bool IsStopping() const { + switch (GetCqState()) { + case WORKING: + return false; + case STOP_SILENT: + case STOP_EXPLICIT: + return true; + } + + Y_UNREACHABLE(); + } + + IQueueClientContextPtr CreateContext() override; + template<typename TGRpcService> std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(const TGRpcClientConfig& config) { return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(CreateChannelInterface(config), this)); @@ -1367,32 +1367,32 @@ public: return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(holder, this)); } - // Tests only, not thread-safe - void AddWorkerThreadForTest(); - + // Tests only, not thread-safe + void AddWorkerThreadForTest(); + private: using IThreadRef = std::unique_ptr<IThreadFactory::IThread>; using CompletionQueueRef = std::unique_ptr<grpc::CompletionQueue>; void Init(size_t numWorkerThread); - inline ECqState GetCqState() const { return (ECqState) AtomicGet(CqState_); } - inline void SetCqState(ECqState state) { AtomicSet(CqState_, state); } - - void StopInternal(bool silent); - void WaitInternal(); - - void ForgetContext(TContextImpl* context); - -private: + inline ECqState GetCqState() const { return (ECqState) AtomicGet(CqState_); } + inline void SetCqState(ECqState state) { AtomicSet(CqState_, state); } + + void StopInternal(bool silent); + void WaitInternal(); + + void ForgetContext(TContextImpl* context); + +private: bool UseCompletionQueuePerThread_; std::vector<CompletionQueueRef> CQS_; std::vector<IThreadRef> WorkerThreads_; - TAtomic CqState_ = -1; - + TAtomic CqState_ = -1; + std::mutex Mtx_; std::condition_variable ContextsEmpty_; std::unordered_set<TContextImpl*> Contexts_; - + std::mutex JoinMutex_; }; diff --git a/library/cpp/grpc/client/grpc_common.h b/library/cpp/grpc/client/grpc_common.h index ffcdafe045..65f74cda2f 100644 --- a/library/cpp/grpc/client/grpc_common.h +++ b/library/cpp/grpc/client/grpc_common.h @@ -24,8 +24,8 @@ struct TGRpcClientConfig { ui64 MemQuota = 0; std::unordered_map<TString, TString> StringChannelParams; std::unordered_map<TString, int> IntChannelParams; - TString LoadBalancingPolicy = { }; - TString SslTargetNameOverride = { }; + TString LoadBalancingPolicy = { }; + TString SslTargetNameOverride = { }; TGRpcClientConfig() = default; TGRpcClientConfig(const TGRpcClientConfig&) = default; @@ -68,12 +68,12 @@ inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRp if (mutator) { args.SetSocketMutator(mutator); } - if (!config.LoadBalancingPolicy.empty()) { - args.SetLoadBalancingPolicyName(config.LoadBalancingPolicy); - } - if (!config.SslTargetNameOverride.empty()) { - args.SetSslTargetNameOverride(config.SslTargetNameOverride); - } + if (!config.LoadBalancingPolicy.empty()) { + args.SetLoadBalancingPolicyName(config.LoadBalancingPolicy); + } + if (!config.SslTargetNameOverride.empty()) { + args.SetSslTargetNameOverride(config.SslTargetNameOverride); + } if (config.EnableSsl || config.SslCaCert) { return grpc::CreateCustomChannel(config.Locator, grpc::SslCredentials(grpc::SslCredentialsOptions{config.SslCaCert, "", ""}), args); } else { diff --git a/library/cpp/grpc/server/grpc_counters.h b/library/cpp/grpc/server/grpc_counters.h index 0b6c36c84c..e86299cbd9 100644 --- a/library/cpp/grpc/server/grpc_counters.h +++ b/library/cpp/grpc/server/grpc_counters.h @@ -83,13 +83,13 @@ public: } void CountRequestBytes(ui32 requestSize) override { - *RequestBytes += requestSize; - } - + *RequestBytes += requestSize; + } + void CountResponseBytes(ui32 responseSize) override { - *ResponseBytes += responseSize; - } - + *ResponseBytes += responseSize; + } + void StartProcessing(ui32 requestSize) override { TotalCounter->Inc(); InflyCounter->Inc(); diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h index 5bd8d3902b..1fcd8b6655 100644 --- a/library/cpp/grpc/server/grpc_request.h +++ b/library/cpp/grpc/server/grpc_request.h @@ -117,24 +117,24 @@ public: return TString(this->Context.peer()); } - bool SslServer() const override { - return Server_->SslServer(); - } - + bool SslServer() const override { + return Server_->SslServer(); + } + void Run() { - // Start request unless server is shutting down - if (auto guard = Server_->ProtectShutdown()) { - Ref(); //For grpc c runtime - this->Context.AsyncNotifyWhenDone(OnFinishTag.Prepare()); - if (RequestCallback_) { - (this->Service->*RequestCallback_) - (&this->Context, Request_, - reinterpret_cast<grpc::ServerAsyncResponseWriter<TOut>*>(Writer_.Get()), this->CQ, this->CQ, GetGRpcTag()); - } else { - (this->Service->*StreamRequestCallback_) - (&this->Context, Request_, - reinterpret_cast<grpc::ServerAsyncWriter<TOut>*>(StreamWriter_.Get()), this->CQ, this->CQ, GetGRpcTag()); - } + // Start request unless server is shutting down + if (auto guard = Server_->ProtectShutdown()) { + Ref(); //For grpc c runtime + this->Context.AsyncNotifyWhenDone(OnFinishTag.Prepare()); + if (RequestCallback_) { + (this->Service->*RequestCallback_) + (&this->Context, Request_, + reinterpret_cast<grpc::ServerAsyncResponseWriter<TOut>*>(Writer_.Get()), this->CQ, this->CQ, GetGRpcTag()); + } else { + (this->Service->*StreamRequestCallback_) + (&this->Context, Request_, + reinterpret_cast<grpc::ServerAsyncWriter<TOut>*>(StreamWriter_.Get()), this->CQ, this->CQ, GetGRpcTag()); + } } } @@ -148,10 +148,10 @@ public: } void DestroyRequest() override { - if (RequestRegistered_) { - Server_->DeregisterRequestCtx(this); - RequestRegistered_ = false; - } + if (RequestRegistered_) { + Server_->DeregisterRequestCtx(this); + RequestRegistered_ = false; + } UnRef(); } @@ -346,15 +346,15 @@ private: ok ? "true" : "false", makeRequestString().data(), this->Context.peer().c_str()); if (this->Context.c_call() == nullptr) { - Y_VERIFY(!ok); + Y_VERIFY(!ok); // One ref by OnFinishTag, grpc will not call this tag if no request received UnRef(); - } else if (!(RequestRegistered_ = Server_->RegisterRequestCtx(this))) { - // Request cannot be registered due to shutdown - // It's unsafe to continue, so drop this request without processing + } else if (!(RequestRegistered_ = Server_->RegisterRequestCtx(this))) { + // Request cannot be registered due to shutdown + // It's unsafe to continue, so drop this request without processing GRPC_LOG_DEBUG(Logger_, "[%p] dropping request Name# %s due to shutdown", this, Name_); - this->Context.TryCancel(); - return false; + this->Context.TryCancel(); + return false; } Clone(); // TODO: Request pool? @@ -501,7 +501,7 @@ private: ui32 ResponseStatus = 0; THPTimer RequestTimer; TAuthState AuthState_ = 0; - bool RequestRegistered_ = false; + bool RequestRegistered_ = false; using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>; TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish }; diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h index fcfce1c181..dc293ec01a 100644 --- a/library/cpp/grpc/server/grpc_request_base.h +++ b/library/cpp/grpc/server/grpc_request_base.h @@ -108,9 +108,9 @@ public: //! Returns peer address virtual TString GetPeer() const = 0; - - //! Returns true if server is using ssl - virtual bool SslServer() const = 0; + + //! Returns true if server is using ssl + virtual bool SslServer() const = 0; }; } // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp index 7437b7a8f5..9bc8305390 100644 --- a/library/cpp/grpc/server/grpc_server.cpp +++ b/library/cpp/grpc/server/grpc_server.cpp @@ -77,7 +77,7 @@ void TGRpcServer::Start() { builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize); builder.SetMaxSendMessageSize(Options_.MaxMessageSize); for (IGRpcServicePtr service : Services_) { - service->SetServerOptions(Options_); + service->SetServerOptions(Options_); builder.RegisterService(service->GetService()); service->SetGlobalLimiterHandle(&Limiter_); } @@ -192,14 +192,14 @@ void TGRpcServer::Stop() { } for (ui64 attempt = 0; ; ++attempt) { - bool unsafe = false; + bool unsafe = false; size_t infly = 0; for (auto& service : Services_) { - unsafe |= service->IsUnsafeToShutdown(); - infly += service->RequestsInProgress(); + unsafe |= service->IsUnsafeToShutdown(); + infly += service->RequestsInProgress(); } - if (!unsafe && !infly) + if (!unsafe && !infly) break; auto spent = (TInstant::Now() - now).SecondsFloat(); @@ -208,7 +208,7 @@ void TGRpcServer::Stop() { Cerr << "GRpc shutdown warning: left infly: " << infly << ", spent: " << spent << " sec" << Endl; } - if (!unsafe && spent > Options_.GRpcShutdownDeadline.SecondsFloat()) + if (!unsafe && spent > Options_.GRpcShutdownDeadline.SecondsFloat()) break; Sleep(TDuration::MilliSeconds(10)); } diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h index d6814a90a0..59ed364bc9 100644 --- a/library/cpp/grpc/server/grpc_server.h +++ b/library/cpp/grpc/server/grpc_server.h @@ -167,77 +167,77 @@ public: virtual void StopService() noexcept = 0; virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0; virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0; - virtual bool IsUnsafeToShutdown() const = 0; - virtual size_t RequestsInProgress() const = 0; - - /** - * Called before service is added to the server builder. This allows - * service to inspect server options and initialize accordingly. - */ - virtual void SetServerOptions(const TServerOptions& options) = 0; + virtual bool IsUnsafeToShutdown() const = 0; + virtual size_t RequestsInProgress() const = 0; + + /** + * Called before service is added to the server builder. This allows + * service to inspect server options and initialize accordingly. + */ + virtual void SetServerOptions(const TServerOptions& options) = 0; }; template<typename T> class TGrpcServiceBase: public IGRpcService { public: - class TShutdownGuard { - using TOwner = TGrpcServiceBase<T>; - friend class TGrpcServiceBase<T>; - - public: - TShutdownGuard() - : Owner(nullptr) - { } - - ~TShutdownGuard() { - Release(); - } - - TShutdownGuard(TShutdownGuard&& other) - : Owner(other.Owner) - { - other.Owner = nullptr; - } - - TShutdownGuard& operator=(TShutdownGuard&& other) { - if (Y_LIKELY(this != &other)) { - Release(); - Owner = other.Owner; - other.Owner = nullptr; - } - return *this; - } - - explicit operator bool() const { - return bool(Owner); - } - - void Release() { - if (Owner) { - AtomicDecrement(Owner->GuardCount_); - Owner = nullptr; - } - } - - TShutdownGuard(const TShutdownGuard&) = delete; - TShutdownGuard& operator=(const TShutdownGuard&) = delete; - - private: - explicit TShutdownGuard(TOwner* owner) - : Owner(owner) - { } - - private: - TOwner* Owner; - }; - -public: + class TShutdownGuard { + using TOwner = TGrpcServiceBase<T>; + friend class TGrpcServiceBase<T>; + + public: + TShutdownGuard() + : Owner(nullptr) + { } + + ~TShutdownGuard() { + Release(); + } + + TShutdownGuard(TShutdownGuard&& other) + : Owner(other.Owner) + { + other.Owner = nullptr; + } + + TShutdownGuard& operator=(TShutdownGuard&& other) { + if (Y_LIKELY(this != &other)) { + Release(); + Owner = other.Owner; + other.Owner = nullptr; + } + return *this; + } + + explicit operator bool() const { + return bool(Owner); + } + + void Release() { + if (Owner) { + AtomicDecrement(Owner->GuardCount_); + Owner = nullptr; + } + } + + TShutdownGuard(const TShutdownGuard&) = delete; + TShutdownGuard& operator=(const TShutdownGuard&) = delete; + + private: + explicit TShutdownGuard(TOwner* owner) + : Owner(owner) + { } + + private: + TOwner* Owner; + }; + +public: using TCurrentGRpcService = T; void StopService() noexcept override { with_lock(Lock_) { - AtomicSet(ShuttingDown_, 1); - + AtomicSet(ShuttingDown_, 1); + // Send TryCansel to event (can be send after finishing). // Actual dtors will be called from grpc thread, so deadlock impossible for (auto* request : Requests_) { @@ -246,21 +246,21 @@ public: } } - TShutdownGuard ProtectShutdown() noexcept { - AtomicIncrement(GuardCount_); - if (IsShuttingDown()) { - AtomicDecrement(GuardCount_); - return { }; - } - - return TShutdownGuard(this); - }; - - bool IsUnsafeToShutdown() const override { - return AtomicGet(GuardCount_) > 0; - } - - size_t RequestsInProgress() const override { + TShutdownGuard ProtectShutdown() noexcept { + AtomicIncrement(GuardCount_); + if (IsShuttingDown()) { + AtomicDecrement(GuardCount_); + return { }; + } + + return TShutdownGuard(this); + }; + + bool IsUnsafeToShutdown() const override { + return AtomicGet(GuardCount_) > 0; + } + + size_t RequestsInProgress() const override { size_t c = 0; with_lock(Lock_) { c = Requests_.size(); @@ -268,9 +268,9 @@ public: return c; } - void SetServerOptions(const TServerOptions& options) override { - SslServer_ = bool(options.SslData); - NeedAuth_ = options.UseAuth; + void SetServerOptions(const TServerOptions& options) override { + SslServer_ = bool(options.SslData); + NeedAuth_ = options.UseAuth; } void SetGlobalLimiterHandle(TGlobalLimiter* /*limiter*/) override {} @@ -280,32 +280,32 @@ public: return AtomicGet(ShuttingDown_); } - bool SslServer() const { - return SslServer_; - } - + bool SslServer() const { + return SslServer_; + } + bool NeedAuth() const { return NeedAuth_; } - bool RegisterRequestCtx(ICancelableContext* req) { + bool RegisterRequestCtx(ICancelableContext* req) { with_lock(Lock_) { - auto r = Requests_.emplace(req); - Y_VERIFY(r.second, "Ctx already registered"); - - if (IsShuttingDown()) { - // Server is already shutting down - Requests_.erase(r.first); - return false; - } + auto r = Requests_.emplace(req); + Y_VERIFY(r.second, "Ctx already registered"); + + if (IsShuttingDown()) { + // Server is already shutting down + Requests_.erase(r.first); + return false; + } } - - return true; + + return true; } void DeregisterRequestCtx(ICancelableContext* req) { with_lock(Lock_) { - Y_VERIFY(Requests_.erase(req), "Ctx is not registered"); + Y_VERIFY(Requests_.erase(req), "Ctx is not registered"); } } @@ -313,15 +313,15 @@ protected: using TGrpcAsyncService = typename TCurrentGRpcService::AsyncService; TGrpcAsyncService Service_; - TGrpcAsyncService* GetService() override { + TGrpcAsyncService* GetService() override { return &Service_; } private: TAtomic ShuttingDown_ = 0; - TAtomic GuardCount_ = 0; + TAtomic GuardCount_ = 0; - bool SslServer_ = false; + bool SslServer_ = false; bool NeedAuth_ = false; THashSet<ICancelableContext*> Requests_; |