diff options
author | Alexey Efimov <xeno@prnwatch.com> | 2022-02-10 16:49:42 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:42 +0300 |
commit | 0fd1998e1b2369f50fb694556f817d3c7fef10c8 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc/client/grpc_client_low.h | |
parent | 26e0e4fb5e5cd6b4d7f4c21f9fcd7978891bf946 (diff) | |
download | ydb-0fd1998e1b2369f50fb694556f817d3c7fef10c8.tar.gz |
Restoring authorship annotation for Alexey Efimov <xeno@prnwatch.com>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/client/grpc_client_low.h')
-rw-r--r-- | library/cpp/grpc/client/grpc_client_low.h | 190 |
1 files changed, 95 insertions, 95 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h index d5ffe74736..ab0a0627be 100644 --- a/library/cpp/grpc/client/grpc_client_low.h +++ b/library/cpp/grpc/client/grpc_client_low.h @@ -126,7 +126,7 @@ public: // Represents grpc status and error message string struct TGrpcStatus { TString Msg; - TString Details; + TString Details; int GRpcStatusCode; bool InternalError; @@ -141,20 +141,20 @@ struct TGrpcStatus { , InternalError(internalError) { } - TGrpcStatus(grpc::StatusCode status, TString msg, TString details = {}) + TGrpcStatus(grpc::StatusCode status, TString msg, TString details = {}) : Msg(std::move(msg)) - , Details(std::move(details)) + , Details(std::move(details)) , GRpcStatusCode(status) , InternalError(false) { } TGrpcStatus(const grpc::Status& status) - : TGrpcStatus(status.error_code(), TString(status.error_message()), TString(status.error_details())) + : TGrpcStatus(status.error_code(), TString(status.error_message()), TString(status.error_details())) { } TGrpcStatus& operator=(const grpc::Status& status) { Msg = TString(status.error_message()); - Details = TString(status.error_details()); + Details = TString(status.error_details()); GRpcStatusCode = status.error_code(); InternalError = false; return *this; @@ -178,9 +178,9 @@ bool inline IsGRpcStatusGood(const TGrpcStatus& status) { template<typename TResponse> using TResponseCallback = std::function<void (TGrpcStatus&&, TResponse&&)>; -template<typename TResponse> -using TAdvancedResponseCallback = std::function<void (const grpc::ClientContext&, TGrpcStatus&&, TResponse&&)>; - +template<typename TResponse> +using TAdvancedResponseCallback = std::function<void (const grpc::ClientContext&, TGrpcStatus&&, TResponse&&)>; + // Call associated metadata struct TCallMeta { std::shared_ptr<grpc::CallCredentials> CallCredentials; @@ -305,86 +305,86 @@ private: bool Replied_ = false; }; -template<typename TStub, typename TRequest, typename TResponse> -class TAdvancedRequestProcessor - : public TThrRefBase - , public IQueueClientEvent - , public TGRpcRequestProcessorCommon { - using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncResponseReader<TResponse>>; - template<typename> friend class TServiceConnection; -public: - using TPtr = TIntrusivePtr<TAdvancedRequestProcessor>; - using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*); - +template<typename TStub, typename TRequest, typename TResponse> +class TAdvancedRequestProcessor + : public TThrRefBase + , public IQueueClientEvent + , public TGRpcRequestProcessorCommon { + using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncResponseReader<TResponse>>; + template<typename> friend class TServiceConnection; +public: + using TPtr = TIntrusivePtr<TAdvancedRequestProcessor>; + using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*); + explicit TAdvancedRequestProcessor(TAdvancedResponseCallback<TResponse>&& callback) : Callback_(std::move(callback)) - { } - - ~TAdvancedRequestProcessor() { - if (!Replied_ && Callback_) { - Callback_(Context, TGrpcStatus::Internal("request left unhandled"), std::move(Reply_)); - Callback_ = nullptr; // free resources as early as possible - } - } - - bool Execute(bool ok) override { + { } + + ~TAdvancedRequestProcessor() { + if (!Replied_ && Callback_) { + Callback_(Context, TGrpcStatus::Internal("request left unhandled"), std::move(Reply_)); + Callback_ = nullptr; // free resources as early as possible + } + } + + bool Execute(bool ok) override { { std::unique_lock<std::mutex> guard(Mutex_); - LocalContext.reset(); - } - TGrpcStatus status; - if (ok) { - status = Status; - } else { - status = TGrpcStatus::Internal("Unexpected error"); - } - Replied_ = true; - Callback_(Context, std::move(status), std::move(Reply_)); - Callback_ = nullptr; // free resources as early as possible - return false; - } - - void Destroy() override { - UnRef(); - } - -private: - IQueueClientEvent* FinishedEvent() { - Ref(); - return this; - } - - void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) { - auto context = provider->CreateContext(); - if (!context) { - Replied_ = true; - Callback_(Context, TGrpcStatus(grpc::StatusCode::CANCELLED, "Client is shutting down"), std::move(Reply_)); - Callback_ = nullptr; - return; - } + LocalContext.reset(); + } + TGrpcStatus status; + if (ok) { + status = Status; + } else { + status = TGrpcStatus::Internal("Unexpected error"); + } + Replied_ = true; + Callback_(Context, std::move(status), std::move(Reply_)); + Callback_ = nullptr; // free resources as early as possible + return false; + } + + void Destroy() override { + UnRef(); + } + +private: + IQueueClientEvent* FinishedEvent() { + Ref(); + return this; + } + + void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) { + auto context = provider->CreateContext(); + if (!context) { + Replied_ = true; + Callback_(Context, TGrpcStatus(grpc::StatusCode::CANCELLED, "Client is shutting down"), std::move(Reply_)); + Callback_ = nullptr; + return; + } { std::unique_lock<std::mutex> guard(Mutex_); - LocalContext = context; - Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue()); - Reader_->Finish(&Reply_, &Status, FinishedEvent()); - } - context->SubscribeStop([self = TPtr(this)] { - self->Stop(); - }); - } - - void Stop() { - Context.TryCancel(); - } - - TAdvancedResponseCallback<TResponse> Callback_; - TResponse Reply_; + LocalContext = context; + Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue()); + Reader_->Finish(&Reply_, &Status, FinishedEvent()); + } + context->SubscribeStop([self = TPtr(this)] { + self->Stop(); + }); + } + + void Stop() { + Context.TryCancel(); + } + + TAdvancedResponseCallback<TResponse> Callback_; + TResponse Reply_; std::mutex Mutex_; - TAsyncReaderPtr Reader_; - - bool Replied_ = false; -}; - + TAsyncReaderPtr Reader_; + + bool Replied_ = false; +}; + template<class TResponse> class IStreamRequestReadProcessor : public TThrRefBase { public: @@ -1255,21 +1255,21 @@ public: } /* - * Start simple request - */ - template<typename TRequest, typename TResponse> - void DoAdvancedRequest(const TRequest& request, + * Start simple request + */ + template<typename TRequest, typename TResponse> + void DoAdvancedRequest(const TRequest& request, TAdvancedResponseCallback<TResponse> callback, - typename TAdvancedRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, - const TCallMeta& metas = { }, - IQueueClientContextProvider* provider = nullptr) - { - auto processor = MakeIntrusive<TAdvancedRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback)); - processor->ApplyMeta(metas); - processor->Start(*Stub_, asyncRequest, request, provider ? provider : Provider_); - } - - /* + typename TAdvancedRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, + const TCallMeta& metas = { }, + IQueueClientContextProvider* provider = nullptr) + { + auto processor = MakeIntrusive<TAdvancedRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback)); + processor->ApplyMeta(metas); + processor->Start(*Stub_, asyncRequest, request, provider ? provider : Provider_); + } + + /* * Start bidirectional streamming */ template<typename TRequest, typename TResponse> |