diff options
author | Alexey Borzenkov <snaury@yandex-team.ru> | 2022-02-10 16:47:43 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:43 +0300 |
commit | 330c83f8c116bd45316397b179275e9d87007e7d (patch) | |
tree | c0748b5dcbade83af788c0abfa89c0383d6b779c /library/cpp/grpc/client/grpc_client_low.h | |
parent | 22d92781ba2a10b7fb5b977b7d1a5c40ff53885f (diff) | |
download | ydb-330c83f8c116bd45316397b179275e9d87007e7d.tar.gz |
Restoring authorship annotation for Alexey Borzenkov <snaury@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/client/grpc_client_low.h')
-rw-r--r-- | library/cpp/grpc/client/grpc_client_low.h | 1234 |
1 files changed, 617 insertions, 617 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h index 5d0983f804..ab0a0627be 100644 --- a/library/cpp/grpc/client/grpc_client_low.h +++ b/library/cpp/grpc/client/grpc_client_low.h @@ -52,125 +52,125 @@ public: virtual void Destroy() = 0; }; -// Implementation of IQueueClientEvent that reduces allocations -template<class TSelf> -class TQueueClientFixedEvent : private IQueueClientEvent { - using TCallback = void (TSelf::*)(bool); - -public: - TQueueClientFixedEvent(TSelf* self, TCallback callback) - : Self(self) - , Callback(callback) - { } - - IQueueClientEvent* Prepare() { - Self->Ref(); - return this; - } - -private: - bool Execute(bool ok) override { - ((*Self).*Callback)(ok); - return false; - } - - void Destroy() override { - Self->UnRef(); - } - -private: - TSelf* const Self; - TCallback const Callback; -}; - -class IQueueClientContext; -using IQueueClientContextPtr = std::shared_ptr<IQueueClientContext>; - -// Provider of IQueueClientContext instances -class IQueueClientContextProvider { -public: - virtual ~IQueueClientContextProvider() = default; - - virtual IQueueClientContextPtr CreateContext() = 0; -}; - -// Activity context for a low-level client -class IQueueClientContext : public IQueueClientContextProvider { -public: - virtual ~IQueueClientContext() = default; - - //! Returns CompletionQueue associated with the client - virtual grpc::CompletionQueue* CompletionQueue() = 0; - - //! Returns true if context has been cancelled - virtual bool IsCancelled() const = 0; - - //! Tries to cancel context, calling all registered callbacks - virtual bool Cancel() = 0; - - //! Subscribes callback to cancellation - // - // Note there's no way to unsubscribe, if subscription is temporary - // make sure you create a new context with CreateContext and release - // it as soon as it's no longer needed. - virtual void SubscribeCancel(std::function<void()> callback) = 0; - - //! Subscribes callback to cancellation - // - // This alias is for compatibility with older code. - void SubscribeStop(std::function<void()> callback) { - SubscribeCancel(std::move(callback)); - } -}; - +// Implementation of IQueueClientEvent that reduces allocations +template<class TSelf> +class TQueueClientFixedEvent : private IQueueClientEvent { + using TCallback = void (TSelf::*)(bool); + +public: + TQueueClientFixedEvent(TSelf* self, TCallback callback) + : Self(self) + , Callback(callback) + { } + + IQueueClientEvent* Prepare() { + Self->Ref(); + return this; + } + +private: + bool Execute(bool ok) override { + ((*Self).*Callback)(ok); + return false; + } + + void Destroy() override { + Self->UnRef(); + } + +private: + TSelf* const Self; + TCallback const Callback; +}; + +class IQueueClientContext; +using IQueueClientContextPtr = std::shared_ptr<IQueueClientContext>; + +// Provider of IQueueClientContext instances +class IQueueClientContextProvider { +public: + virtual ~IQueueClientContextProvider() = default; + + virtual IQueueClientContextPtr CreateContext() = 0; +}; + +// Activity context for a low-level client +class IQueueClientContext : public IQueueClientContextProvider { +public: + virtual ~IQueueClientContext() = default; + + //! Returns CompletionQueue associated with the client + virtual grpc::CompletionQueue* CompletionQueue() = 0; + + //! Returns true if context has been cancelled + virtual bool IsCancelled() const = 0; + + //! Tries to cancel context, calling all registered callbacks + virtual bool Cancel() = 0; + + //! Subscribes callback to cancellation + // + // Note there's no way to unsubscribe, if subscription is temporary + // make sure you create a new context with CreateContext and release + // it as soon as it's no longer needed. + virtual void SubscribeCancel(std::function<void()> callback) = 0; + + //! Subscribes callback to cancellation + // + // This alias is for compatibility with older code. + void SubscribeStop(std::function<void()> callback) { + SubscribeCancel(std::move(callback)); + } +}; + // Represents grpc status and error message string struct TGrpcStatus { - TString Msg; + TString Msg; TString Details; - int GRpcStatusCode; - bool InternalError; - - TGrpcStatus() - : GRpcStatusCode(grpc::StatusCode::OK) - , InternalError(false) - { } - - TGrpcStatus(TString msg, int statusCode, bool internalError) - : Msg(std::move(msg)) - , GRpcStatusCode(statusCode) - , InternalError(internalError) - { } - + int GRpcStatusCode; + bool InternalError; + + TGrpcStatus() + : GRpcStatusCode(grpc::StatusCode::OK) + , InternalError(false) + { } + + TGrpcStatus(TString msg, int statusCode, bool internalError) + : Msg(std::move(msg)) + , GRpcStatusCode(statusCode) + , InternalError(internalError) + { } + TGrpcStatus(grpc::StatusCode status, TString msg, TString details = {}) - : Msg(std::move(msg)) + : Msg(std::move(msg)) , Details(std::move(details)) - , GRpcStatusCode(status) - , InternalError(false) - { } - - TGrpcStatus(const grpc::Status& status) + , GRpcStatusCode(status) + , InternalError(false) + { } + + TGrpcStatus(const grpc::Status& status) : TGrpcStatus(status.error_code(), TString(status.error_message()), TString(status.error_details())) - { } - - TGrpcStatus& operator=(const grpc::Status& status) { - Msg = TString(status.error_message()); + { } + + TGrpcStatus& operator=(const grpc::Status& status) { + Msg = TString(status.error_message()); Details = TString(status.error_details()); - GRpcStatusCode = status.error_code(); - InternalError = false; - return *this; - } - - static TGrpcStatus Internal(TString msg) { - return { std::move(msg), -1, true }; - } - - bool Ok() const { - return !InternalError && GRpcStatusCode == grpc::StatusCode::OK; - } + GRpcStatusCode = status.error_code(); + InternalError = false; + return *this; + } + + static TGrpcStatus Internal(TString msg) { + return { std::move(msg), -1, true }; + } + + bool Ok() const { + return !InternalError && GRpcStatusCode == grpc::StatusCode::OK; + } }; bool inline IsGRpcStatusGood(const TGrpcStatus& status) { - return status.Ok(); + return status.Ok(); } // Response callback type - this callback will be called when request is finished @@ -241,9 +241,9 @@ public: { } ~TSimpleRequestProcessor() { - if (!Replied_ && Callback_) { - Callback_(TGrpcStatus::Internal("request left unhandled"), std::move(Reply_)); - Callback_ = nullptr; // free resources as early as possible + if (!Replied_ && Callback_) { + Callback_(TGrpcStatus::Internal("request left unhandled"), std::move(Reply_)); + Callback_ = nullptr; // free resources as early as possible } } @@ -251,53 +251,53 @@ public: { std::unique_lock<std::mutex> guard(Mutex_); LocalContext.reset(); - } - TGrpcStatus status; - if (ok) { + } + TGrpcStatus status; + if (ok) { status = Status; - } else { - status = TGrpcStatus::Internal("Unexpected error"); + } else { + status = TGrpcStatus::Internal("Unexpected error"); } Replied_ = true; Callback_(std::move(status), std::move(Reply_)); - Callback_ = nullptr; // free resources as early as possible + Callback_ = nullptr; // free resources as early as possible return false; } void Destroy() override { - UnRef(); + UnRef(); } private: - IQueueClientEvent* FinishedEvent() { - Ref(); - return this; + IQueueClientEvent* FinishedEvent() { + Ref(); + return this; } void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) { - auto context = provider->CreateContext(); - if (!context) { - Replied_ = true; - Callback_(TGrpcStatus(grpc::StatusCode::CANCELLED, "Client is shutting down"), std::move(Reply_)); - Callback_ = nullptr; - return; - } + auto context = provider->CreateContext(); + if (!context) { + Replied_ = true; + Callback_(TGrpcStatus(grpc::StatusCode::CANCELLED, "Client is shutting down"), std::move(Reply_)); + Callback_ = nullptr; + return; + } { std::unique_lock<std::mutex> guard(Mutex_); LocalContext = context; Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue()); Reader_->Finish(&Reply_, &Status, FinishedEvent()); - } - context->SubscribeStop([self = TPtr(this)] { - self->Stop(); - }); - } - - void Stop() { + } + context->SubscribeStop([self = TPtr(this)] { + self->Stop(); + }); + } + + void Stop() { Context.TryCancel(); - } - - TResponseCallback<TResponse> Callback_; + } + + TResponseCallback<TResponse> Callback_; TResponse Reply_; std::mutex Mutex_; TAsyncReaderPtr Reader_; @@ -387,49 +387,49 @@ private: template<class TResponse> class IStreamRequestReadProcessor : public TThrRefBase { -public: +public: using TPtr = TIntrusivePtr<IStreamRequestReadProcessor>; - using TReadCallback = std::function<void(TGrpcStatus&&)>; - - /** - * Asynchronously cancel the request - */ - virtual void Cancel() = 0; - - /** + using TReadCallback = std::function<void(TGrpcStatus&&)>; + + /** + * Asynchronously cancel the request + */ + virtual void Cancel() = 0; + + /** * Scheduled initial server metadata read from the stream */ virtual void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) = 0; /** - * Scheduled response read from the stream - * Callback will be called with the status if it failed - * Only one Read or Finish call may be active at a time - */ - virtual void Read(TResponse* response, TReadCallback callback) = 0; - - /** - * Stop reading and gracefully finish the stream - * Only one Read or Finish call may be active at a time - */ - virtual void Finish(TReadCallback callback) = 0; - - /** - * Additional callback to be called when stream has finished - */ - virtual void AddFinishedCallback(TReadCallback callback) = 0; -}; - + * Scheduled response read from the stream + * Callback will be called with the status if it failed + * Only one Read or Finish call may be active at a time + */ + virtual void Read(TResponse* response, TReadCallback callback) = 0; + + /** + * Stop reading and gracefully finish the stream + * Only one Read or Finish call may be active at a time + */ + virtual void Finish(TReadCallback callback) = 0; + + /** + * Additional callback to be called when stream has finished + */ + virtual void AddFinishedCallback(TReadCallback callback) = 0; +}; + template<class TRequest, class TResponse> class IStreamRequestReadWriteProcessor : public IStreamRequestReadProcessor<TResponse> { public: using TPtr = TIntrusivePtr<IStreamRequestReadWriteProcessor>; - using TWriteCallback = std::function<void(TGrpcStatus&&)>; + using TWriteCallback = std::function<void(TGrpcStatus&&)>; /** * Scheduled request write to the stream */ - virtual void Write(TRequest&& request, TWriteCallback callback = { }) = 0; + virtual void Write(TRequest&& request, TWriteCallback callback = { }) = 0; }; class TGRpcKeepAliveSocketMutator; @@ -548,7 +548,7 @@ public: { std::unique_lock<std::mutex> guard(Mutex); Cancelled = true; - if (Started && !ReadFinished) { + if (Started && !ReadFinished) { if (!ReadActive) { ReadFinished = true; } @@ -640,31 +640,31 @@ public: callback(std::move(status)); } - - void AddFinishedCallback(TReadCallback callback) override { - Y_VERIFY(callback, "Unexpected empty callback"); - - TGrpcStatus status; - + + void AddFinishedCallback(TReadCallback callback) override { + Y_VERIFY(callback, "Unexpected empty callback"); + + TGrpcStatus status; + { std::unique_lock<std::mutex> guard(Mutex); - if (!Finished) { - FinishedCallbacks.emplace_back().swap(callback); - return; - } - - if (FinishedOk) { - status = Status; - } else if (Cancelled) { - status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled"); - } else { - status = TGrpcStatus::Internal("Unexpected error"); - } - } - - callback(std::move(status)); - } - + if (!Finished) { + FinishedCallbacks.emplace_back().swap(callback); + return; + } + + if (FinishedOk) { + status = Status; + } else if (Cancelled) { + status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled"); + } else { + status = TGrpcStatus::Internal("Unexpected error"); + } + } + + callback(std::move(status)); + } + private: void Start(TStub& stub, const TRequest& request, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) { auto context = provider->CreateContext(); @@ -727,8 +727,8 @@ private: { std::unique_lock<std::mutex> guard(Mutex); - Started = true; - if (!ok || Cancelled) { + Started = true; + if (!ok || Cancelled) { ReadFinished = true; Stream->Finish(&Status, OnFinishedTag.Prepare()); return; @@ -743,7 +743,7 @@ private: void OnFinished(bool ok) { TGrpcStatus status; std::vector<TReadCallback> finishedCallbacks; - TReaderCallback startCallback; + TReaderCallback startCallback; TReadCallback readCallback; TReadCallback finishCallback; @@ -756,19 +756,19 @@ private: if (ok) { status = Status; - } else if (Cancelled) { - status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled"); + } else if (Cancelled) { + status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled"); } else { status = TGrpcStatus::Internal("Unexpected error"); } - finishedCallbacks.swap(FinishedCallbacks); - - if (Callback) { - Y_VERIFY(!ReadActive); - startCallback = std::move(Callback); - Callback = nullptr; - } else if (ReadActive) { + finishedCallbacks.swap(FinishedCallbacks); + + if (Callback) { + Y_VERIFY(!ReadActive); + startCallback = std::move(Callback); + Callback = nullptr; + } else if (ReadActive) { if (ReadCallback) { readCallback = std::move(ReadCallback); ReadCallback = nullptr; @@ -780,18 +780,18 @@ private: } } - for (auto& finishedCallback : finishedCallbacks) { - auto statusCopy = status; - finishedCallback(std::move(statusCopy)); - } - - if (startCallback) { + for (auto& finishedCallback : finishedCallbacks) { + auto statusCopy = status; + finishedCallback(std::move(statusCopy)); + } + + if (startCallback) { + if (status.Ok()) { + status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure"); + } + startCallback(std::move(status), nullptr); + } else if (readCallback) { if (status.Ok()) { - status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure"); - } - startCallback(std::move(status), nullptr); - } else if (readCallback) { - if (status.Ok()) { status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); } readCallback(std::move(status)); @@ -812,7 +812,7 @@ private: TReadCallback FinishCallback; std::vector<TReadCallback> FinishedCallbacks; std::unordered_multimap<TString, TString>* InitialMetadata = nullptr; - bool Started = false; + bool Started = false; bool HasInitialMetadata = false; bool ReadActive = false; bool ReadFinished = false; @@ -821,72 +821,72 @@ private: bool FinishedOk = false; }; -template<class TRequest, class TResponse> +template<class TRequest, class TResponse> using TStreamConnectedCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadWriteProcessor<TRequest, TResponse>::TPtr)>; - -template<class TStub, class TRequest, class TResponse> + +template<class TStub, class TRequest, class TResponse> class TStreamRequestReadWriteProcessor : public IStreamRequestReadWriteProcessor<TRequest, TResponse> , public TGRpcRequestProcessorCommon { -public: +public: using TSelf = TStreamRequestReadWriteProcessor; using TBase = IStreamRequestReadWriteProcessor<TRequest, TResponse>; - using TPtr = TIntrusivePtr<TSelf>; - using TConnectedCallback = TStreamConnectedCallback<TRequest, TResponse>; - using TReadCallback = typename TBase::TReadCallback; - using TWriteCallback = typename TBase::TWriteCallback; - using TAsyncReaderWriterPtr = std::unique_ptr<grpc::ClientAsyncReaderWriter<TRequest, TResponse>>; - using TAsyncRequest = TAsyncReaderWriterPtr (TStub::*)(grpc::ClientContext*, grpc::CompletionQueue*, void*); - + using TPtr = TIntrusivePtr<TSelf>; + using TConnectedCallback = TStreamConnectedCallback<TRequest, TResponse>; + using TReadCallback = typename TBase::TReadCallback; + using TWriteCallback = typename TBase::TWriteCallback; + using TAsyncReaderWriterPtr = std::unique_ptr<grpc::ClientAsyncReaderWriter<TRequest, TResponse>>; + using TAsyncRequest = TAsyncReaderWriterPtr (TStub::*)(grpc::ClientContext*, grpc::CompletionQueue*, void*); + explicit TStreamRequestReadWriteProcessor(TConnectedCallback&& callback) - : ConnectedCallback(std::move(callback)) - { - Y_VERIFY(ConnectedCallback, "Missing connected callback"); - } - - void Cancel() override { - Context.TryCancel(); - + : ConnectedCallback(std::move(callback)) + { + Y_VERIFY(ConnectedCallback, "Missing connected callback"); + } + + void Cancel() override { + Context.TryCancel(); + { std::unique_lock<std::mutex> guard(Mutex); - Cancelled = true; - if (Started && !(ReadFinished && WriteFinished)) { - if (!ReadActive) { - ReadFinished = true; - } - if (!WriteActive) { - WriteFinished = true; - } - if (ReadFinished && WriteFinished) { - Stream->Finish(&Status, OnFinishedTag.Prepare()); - } - } - } - } - - void Write(TRequest&& request, TWriteCallback callback) override { - TGrpcStatus status; - + Cancelled = true; + if (Started && !(ReadFinished && WriteFinished)) { + if (!ReadActive) { + ReadFinished = true; + } + if (!WriteActive) { + WriteFinished = true; + } + if (ReadFinished && WriteFinished) { + Stream->Finish(&Status, OnFinishedTag.Prepare()); + } + } + } + } + + void Write(TRequest&& request, TWriteCallback callback) override { + TGrpcStatus status; + { std::unique_lock<std::mutex> guard(Mutex); - if (Cancelled || ReadFinished || WriteFinished) { - status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped"); - } else if (WriteActive) { - auto& item = WriteQueue.emplace_back(); - item.Callback.swap(callback); - item.Request.Swap(&request); - } else { - WriteActive = true; - WriteCallback.swap(callback); - Stream->Write(request, OnWriteDoneTag.Prepare()); - } - } - - if (!status.Ok() && callback) { - callback(std::move(status)); - } - } - + if (Cancelled || ReadFinished || WriteFinished) { + status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped"); + } else if (WriteActive) { + auto& item = WriteQueue.emplace_back(); + item.Callback.swap(callback); + item.Request.Swap(&request); + } else { + WriteActive = true; + WriteCallback.swap(callback); + Stream->Write(request, OnWriteDoneTag.Prepare()); + } + } + + if (!status.Ok() && callback) { + callback(std::move(status)); + } + } + void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override { TGrpcStatus status; @@ -916,321 +916,321 @@ public: callback(std::move(status)); } - void Read(TResponse* message, TReadCallback callback) override { - TGrpcStatus status; - + void Read(TResponse* message, TReadCallback callback) override { + TGrpcStatus status; + { std::unique_lock<std::mutex> guard(Mutex); - Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); - if (!Finished) { - ReadActive = true; - ReadCallback = std::move(callback); - if (!ReadFinished) { - Stream->Read(message, OnReadDoneTag.Prepare()); - } - return; - } - if (FinishedOk) { - status = Status; - } else { - status = TGrpcStatus::Internal("Unexpected error"); - } - } - - if (status.Ok()) { - status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); - } - - callback(std::move(status)); - } - - void Finish(TReadCallback callback) override { - TGrpcStatus status; - + Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); + if (!Finished) { + ReadActive = true; + ReadCallback = std::move(callback); + if (!ReadFinished) { + Stream->Read(message, OnReadDoneTag.Prepare()); + } + return; + } + if (FinishedOk) { + status = Status; + } else { + status = TGrpcStatus::Internal("Unexpected error"); + } + } + + if (status.Ok()) { + status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); + } + + callback(std::move(status)); + } + + void Finish(TReadCallback callback) override { + TGrpcStatus status; + { std::unique_lock<std::mutex> guard(Mutex); - Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); - if (!Finished) { - ReadActive = true; - FinishCallback = std::move(callback); - if (!ReadFinished) { - ReadFinished = true; - if (!WriteActive) { - WriteFinished = true; - } - if (WriteFinished) { - Stream->Finish(&Status, OnFinishedTag.Prepare()); - } - } - return; - } - if (FinishedOk) { - status = Status; - } else { - status = TGrpcStatus::Internal("Unexpected error"); - } - } - - callback(std::move(status)); - } - - void AddFinishedCallback(TReadCallback callback) override { - Y_VERIFY(callback, "Unexpected empty callback"); - - TGrpcStatus status; - + Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); + if (!Finished) { + ReadActive = true; + FinishCallback = std::move(callback); + if (!ReadFinished) { + ReadFinished = true; + if (!WriteActive) { + WriteFinished = true; + } + if (WriteFinished) { + Stream->Finish(&Status, OnFinishedTag.Prepare()); + } + } + return; + } + if (FinishedOk) { + status = Status; + } else { + status = TGrpcStatus::Internal("Unexpected error"); + } + } + + callback(std::move(status)); + } + + void AddFinishedCallback(TReadCallback callback) override { + Y_VERIFY(callback, "Unexpected empty callback"); + + TGrpcStatus status; + { std::unique_lock<std::mutex> guard(Mutex); - if (!Finished) { - FinishedCallbacks.emplace_back().swap(callback); - return; - } - - if (FinishedOk) { - status = Status; - } else if (Cancelled) { - status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled"); - } else { - status = TGrpcStatus::Internal("Unexpected error"); - } - } - - callback(std::move(status)); - } - -private: - template<typename> friend class TServiceConnection; - - void Start(TStub& stub, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) { - auto context = provider->CreateContext(); - if (!context) { - auto callback = std::move(ConnectedCallback); - TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down"); - callback(std::move(status), nullptr); - return; - } - + if (!Finished) { + FinishedCallbacks.emplace_back().swap(callback); + return; + } + + if (FinishedOk) { + status = Status; + } else if (Cancelled) { + status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled"); + } else { + status = TGrpcStatus::Internal("Unexpected error"); + } + } + + callback(std::move(status)); + } + +private: + template<typename> friend class TServiceConnection; + + void Start(TStub& stub, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) { + auto context = provider->CreateContext(); + if (!context) { + auto callback = std::move(ConnectedCallback); + TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down"); + callback(std::move(status), nullptr); + return; + } + { std::unique_lock<std::mutex> guard(Mutex); - LocalContext = context; - Stream = (stub.*asyncRequest)(&Context, context->CompletionQueue(), OnConnectedTag.Prepare()); - } - - context->SubscribeStop([self = TPtr(this)] { - self->Cancel(); - }); - } - -private: - void OnConnected(bool ok) { - TConnectedCallback callback; - + LocalContext = context; + Stream = (stub.*asyncRequest)(&Context, context->CompletionQueue(), OnConnectedTag.Prepare()); + } + + context->SubscribeStop([self = TPtr(this)] { + self->Cancel(); + }); + } + +private: + void OnConnected(bool ok) { + TConnectedCallback callback; + { std::unique_lock<std::mutex> guard(Mutex); - Started = true; - if (!ok || Cancelled) { - ReadFinished = true; - WriteFinished = true; - Stream->Finish(&Status, OnFinishedTag.Prepare()); - return; - } - - callback = std::move(ConnectedCallback); - ConnectedCallback = nullptr; - } - - callback({ }, typename TBase::TPtr(this)); - } - - void OnReadDone(bool ok) { - TGrpcStatus status; - TReadCallback callback; + Started = true; + if (!ok || Cancelled) { + ReadFinished = true; + WriteFinished = true; + Stream->Finish(&Status, OnFinishedTag.Prepare()); + return; + } + + callback = std::move(ConnectedCallback); + ConnectedCallback = nullptr; + } + + callback({ }, typename TBase::TPtr(this)); + } + + void OnReadDone(bool ok) { + TGrpcStatus status; + TReadCallback callback; std::unordered_multimap<TString, TString>* initialMetadata = nullptr; - + { std::unique_lock<std::mutex> guard(Mutex); - Y_VERIFY(ReadActive, "Unexpected Read done callback"); - Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag"); - - if (!ok || Cancelled || WriteFinished) { - ReadFinished = true; - if (!WriteActive) { - WriteFinished = true; - } - if (WriteFinished) { - Stream->Finish(&Status, OnFinishedTag.Prepare()); - } - if (!ok) { - // Keep ReadActive=true, so callback is called - // after the call is finished with an error - return; - } - } - - callback = std::move(ReadCallback); - ReadCallback = nullptr; - ReadActive = false; + Y_VERIFY(ReadActive, "Unexpected Read done callback"); + Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag"); + + if (!ok || Cancelled || WriteFinished) { + ReadFinished = true; + if (!WriteActive) { + WriteFinished = true; + } + if (WriteFinished) { + Stream->Finish(&Status, OnFinishedTag.Prepare()); + } + if (!ok) { + // Keep ReadActive=true, so callback is called + // after the call is finished with an error + return; + } + } + + callback = std::move(ReadCallback); + ReadCallback = nullptr; + ReadActive = false; initialMetadata = InitialMetadata; InitialMetadata = nullptr; HasInitialMetadata = true; - } - + } + if (initialMetadata) { GetInitialMetadata(initialMetadata); } - callback(std::move(status)); - } - - void OnWriteDone(bool ok) { - TWriteCallback okCallback; - + callback(std::move(status)); + } + + void OnWriteDone(bool ok) { + TWriteCallback okCallback; + { std::unique_lock<std::mutex> guard(Mutex); - Y_VERIFY(WriteActive, "Unexpected Write done callback"); - Y_VERIFY(!WriteFinished, "Unexpected WriteFinished flag"); - - if (ok) { - okCallback.swap(WriteCallback); - } else if (WriteCallback) { - // Put callback back on the queue until OnFinished - auto& item = WriteQueue.emplace_front(); - item.Callback.swap(WriteCallback); - } - - if (!ok || Cancelled) { - WriteActive = false; - WriteFinished = true; - if (!ReadActive) { - ReadFinished = true; - } - if (ReadFinished) { - Stream->Finish(&Status, OnFinishedTag.Prepare()); - } + Y_VERIFY(WriteActive, "Unexpected Write done callback"); + Y_VERIFY(!WriteFinished, "Unexpected WriteFinished flag"); + + if (ok) { + okCallback.swap(WriteCallback); + } else if (WriteCallback) { + // Put callback back on the queue until OnFinished + auto& item = WriteQueue.emplace_front(); + item.Callback.swap(WriteCallback); + } + + if (!ok || Cancelled) { + WriteActive = false; + WriteFinished = true; + if (!ReadActive) { + ReadFinished = true; + } + if (ReadFinished) { + Stream->Finish(&Status, OnFinishedTag.Prepare()); + } } else if (!WriteQueue.empty()) { - WriteCallback.swap(WriteQueue.front().Callback); - Stream->Write(WriteQueue.front().Request, OnWriteDoneTag.Prepare()); - WriteQueue.pop_front(); - } else { - WriteActive = false; - if (ReadFinished) { - WriteFinished = true; - Stream->Finish(&Status, OnFinishedTag.Prepare()); - } - } - } - - if (okCallback) { - okCallback(TGrpcStatus()); - } - } - - void OnFinished(bool ok) { - TGrpcStatus status; + WriteCallback.swap(WriteQueue.front().Callback); + Stream->Write(WriteQueue.front().Request, OnWriteDoneTag.Prepare()); + WriteQueue.pop_front(); + } else { + WriteActive = false; + if (ReadFinished) { + WriteFinished = true; + Stream->Finish(&Status, OnFinishedTag.Prepare()); + } + } + } + + if (okCallback) { + okCallback(TGrpcStatus()); + } + } + + void OnFinished(bool ok) { + TGrpcStatus status; std::deque<TWriteItem> writesDropped; std::vector<TReadCallback> finishedCallbacks; - TConnectedCallback connectedCallback; - TReadCallback readCallback; - TReadCallback finishCallback; - + TConnectedCallback connectedCallback; + TReadCallback readCallback; + TReadCallback finishCallback; + { std::unique_lock<std::mutex> guard(Mutex); - Finished = true; - FinishedOk = ok; - LocalContext.reset(); - - if (ok) { - status = Status; - } else if (Cancelled) { - status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled"); - } else { - status = TGrpcStatus::Internal("Unexpected error"); - } - - writesDropped.swap(WriteQueue); - finishedCallbacks.swap(FinishedCallbacks); - - if (ConnectedCallback) { - Y_VERIFY(!ReadActive); - connectedCallback = std::move(ConnectedCallback); - ConnectedCallback = nullptr; - } else if (ReadActive) { - if (ReadCallback) { - readCallback = std::move(ReadCallback); - ReadCallback = nullptr; - } else { - finishCallback = std::move(FinishCallback); - FinishCallback = nullptr; - } - ReadActive = false; - } - } - - for (auto& item : writesDropped) { - if (item.Callback) { - TGrpcStatus writeStatus = status; - if (writeStatus.Ok()) { - writeStatus = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped"); - } - item.Callback(std::move(writeStatus)); - } - } - - for (auto& finishedCallback : finishedCallbacks) { - TGrpcStatus statusCopy = status; - finishedCallback(std::move(statusCopy)); - } - - if (connectedCallback) { - if (status.Ok()) { - status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure"); - } - connectedCallback(std::move(status), nullptr); - } else if (readCallback) { - if (status.Ok()) { - status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); - } - readCallback(std::move(status)); - } else if (finishCallback) { - finishCallback(std::move(status)); - } - } - -private: - struct TWriteItem { - TWriteCallback Callback; - TRequest Request; - }; - -private: - using TFixedEvent = TQueueClientFixedEvent<TSelf>; - - TFixedEvent OnConnectedTag = { this, &TSelf::OnConnected }; - TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone }; - TFixedEvent OnWriteDoneTag = { this, &TSelf::OnWriteDone }; - TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished }; - -private: + Finished = true; + FinishedOk = ok; + LocalContext.reset(); + + if (ok) { + status = Status; + } else if (Cancelled) { + status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled"); + } else { + status = TGrpcStatus::Internal("Unexpected error"); + } + + writesDropped.swap(WriteQueue); + finishedCallbacks.swap(FinishedCallbacks); + + if (ConnectedCallback) { + Y_VERIFY(!ReadActive); + connectedCallback = std::move(ConnectedCallback); + ConnectedCallback = nullptr; + } else if (ReadActive) { + if (ReadCallback) { + readCallback = std::move(ReadCallback); + ReadCallback = nullptr; + } else { + finishCallback = std::move(FinishCallback); + FinishCallback = nullptr; + } + ReadActive = false; + } + } + + for (auto& item : writesDropped) { + if (item.Callback) { + TGrpcStatus writeStatus = status; + if (writeStatus.Ok()) { + writeStatus = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped"); + } + item.Callback(std::move(writeStatus)); + } + } + + for (auto& finishedCallback : finishedCallbacks) { + TGrpcStatus statusCopy = status; + finishedCallback(std::move(statusCopy)); + } + + if (connectedCallback) { + if (status.Ok()) { + status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure"); + } + connectedCallback(std::move(status), nullptr); + } else if (readCallback) { + if (status.Ok()) { + status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); + } + readCallback(std::move(status)); + } else if (finishCallback) { + finishCallback(std::move(status)); + } + } + +private: + struct TWriteItem { + TWriteCallback Callback; + TRequest Request; + }; + +private: + using TFixedEvent = TQueueClientFixedEvent<TSelf>; + + TFixedEvent OnConnectedTag = { this, &TSelf::OnConnected }; + TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone }; + TFixedEvent OnWriteDoneTag = { this, &TSelf::OnWriteDone }; + TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished }; + +private: std::mutex Mutex; - TAsyncReaderWriterPtr Stream; - TConnectedCallback ConnectedCallback; - TReadCallback ReadCallback; - TReadCallback FinishCallback; + TAsyncReaderWriterPtr Stream; + TConnectedCallback ConnectedCallback; + TReadCallback ReadCallback; + TReadCallback FinishCallback; std::vector<TReadCallback> FinishedCallbacks; std::deque<TWriteItem> WriteQueue; - TWriteCallback WriteCallback; + TWriteCallback WriteCallback; std::unordered_multimap<TString, TString>* InitialMetadata = nullptr; - bool Started = false; + bool Started = false; bool HasInitialMetadata = false; - bool ReadActive = false; - bool ReadFinished = false; - bool WriteActive = false; - bool WriteFinished = false; - bool Finished = false; - bool Cancelled = false; - bool FinishedOk = false; -}; - + bool ReadActive = false; + bool ReadFinished = false; + bool WriteActive = false; + bool WriteFinished = false; + bool Finished = false; + bool Cancelled = false; + bool FinishedOk = false; +}; + class TGRpcClientLow; template<typename TGRpcService> @@ -1245,9 +1245,9 @@ public: template<typename TRequest, typename TResponse> void DoRequest(const TRequest& request, TResponseCallback<TResponse> callback, - typename TSimpleRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, + typename TSimpleRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, const TCallMeta& metas = { }, - IQueueClientContextProvider* provider = nullptr) + IQueueClientContextProvider* provider = nullptr) { auto processor = MakeIntrusive<TSimpleRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback)); processor->ApplyMeta(metas); @@ -1282,31 +1282,31 @@ public: processor->ApplyMeta(metas); processor->Start(*Stub_, std::move(asyncRequest), provider ? provider : Provider_); } - + /* * Start streaming response reading (one request, many responses) */ - template<typename TRequest, typename TResponse> + template<typename TRequest, typename TResponse> void DoStreamRequest(const TRequest& request, TStreamReaderCallback<TResponse> callback, typename TStreamRequestReadProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, - const TCallMeta& metas = { }, - IQueueClientContextProvider* provider = nullptr) - { + const TCallMeta& metas = { }, + IQueueClientContextProvider* provider = nullptr) + { auto processor = MakeIntrusive<TStreamRequestReadProcessor<TStub, TRequest, TResponse>>(std::move(callback)); - processor->ApplyMeta(metas); + processor->ApplyMeta(metas); processor->Start(*Stub_, request, std::move(asyncRequest), provider ? provider : Provider_); - } - + } + private: TServiceConnection(std::shared_ptr<grpc::ChannelInterface> ci, - IQueueClientContextProvider* provider) + IQueueClientContextProvider* provider) : Stub_(TGRpcService::NewStub(ci)) - , Provider_(provider) - { - Y_VERIFY(Provider_, "Connection does not have a queue provider"); - } - + , Provider_(provider) + { + Y_VERIFY(Provider_, "Connection does not have a queue provider"); + } + TServiceConnection(TStubsHolder& holder, IQueueClientContextProvider* provider) : Stub_(holder.GetOrCreateStub<TStub>()) @@ -1316,47 +1316,47 @@ private: } std::shared_ptr<TStub> Stub_; - IQueueClientContextProvider* Provider_; + IQueueClientContextProvider* Provider_; }; class TGRpcClientLow : public IQueueClientContextProvider { - class TContextImpl; - friend class TContextImpl; - - enum ECqState : TAtomicBase { - WORKING = 0, - STOP_SILENT = 1, - STOP_EXPLICIT = 2, - }; - + class TContextImpl; + friend class TContextImpl; + + enum ECqState : TAtomicBase { + WORKING = 0, + STOP_SILENT = 1, + STOP_EXPLICIT = 2, + }; + public: explicit TGRpcClientLow(size_t numWorkerThread = DEFAULT_NUM_THREADS, bool useCompletionQueuePerThread = false); ~TGRpcClientLow(); - // Tries to stop all currently running requests (via their stop callbacks) - // Will shutdown CQ and drain events once all requests have finished - // No new requests may be started after this call - void Stop(bool wait = false); - - // Waits until all currently running requests finish execution - void WaitIdle(); - - inline bool IsStopping() const { - switch (GetCqState()) { - case WORKING: - return false; - case STOP_SILENT: - case STOP_EXPLICIT: - return true; - } - - Y_UNREACHABLE(); - } - - IQueueClientContextPtr CreateContext() override; - + // Tries to stop all currently running requests (via their stop callbacks) + // Will shutdown CQ and drain events once all requests have finished + // No new requests may be started after this call + void Stop(bool wait = false); + + // Waits until all currently running requests finish execution + void WaitIdle(); + + inline bool IsStopping() const { + switch (GetCqState()) { + case WORKING: + return false; + case STOP_SILENT: + case STOP_EXPLICIT: + return true; + } + + Y_UNREACHABLE(); + } + + IQueueClientContextPtr CreateContext() override; + template<typename TGRpcService> std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(const TGRpcClientConfig& config) { return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(CreateChannelInterface(config), this)); @@ -1367,32 +1367,32 @@ public: return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(holder, this)); } - // Tests only, not thread-safe - void AddWorkerThreadForTest(); - + // Tests only, not thread-safe + void AddWorkerThreadForTest(); + private: using IThreadRef = std::unique_ptr<IThreadFactory::IThread>; using CompletionQueueRef = std::unique_ptr<grpc::CompletionQueue>; void Init(size_t numWorkerThread); - inline ECqState GetCqState() const { return (ECqState) AtomicGet(CqState_); } - inline void SetCqState(ECqState state) { AtomicSet(CqState_, state); } - - void StopInternal(bool silent); - void WaitInternal(); - - void ForgetContext(TContextImpl* context); - -private: + inline ECqState GetCqState() const { return (ECqState) AtomicGet(CqState_); } + inline void SetCqState(ECqState state) { AtomicSet(CqState_, state); } + + void StopInternal(bool silent); + void WaitInternal(); + + void ForgetContext(TContextImpl* context); + +private: bool UseCompletionQueuePerThread_; std::vector<CompletionQueueRef> CQS_; std::vector<IThreadRef> WorkerThreads_; - TAtomic CqState_ = -1; - + TAtomic CqState_ = -1; + std::mutex Mtx_; std::condition_variable ContextsEmpty_; std::unordered_set<TContextImpl*> Contexts_; - + std::mutex JoinMutex_; }; |