aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/client/grpc_client_low.h
diff options
context:
space:
mode:
authorAlexey Efimov <xeno@prnwatch.com>2022-02-10 16:49:42 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:49:42 +0300
commit0fd1998e1b2369f50fb694556f817d3c7fef10c8 (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc/client/grpc_client_low.h
parent26e0e4fb5e5cd6b4d7f4c21f9fcd7978891bf946 (diff)
downloadydb-0fd1998e1b2369f50fb694556f817d3c7fef10c8.tar.gz
Restoring authorship annotation for Alexey Efimov <xeno@prnwatch.com>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/client/grpc_client_low.h')
-rw-r--r--library/cpp/grpc/client/grpc_client_low.h190
1 files changed, 95 insertions, 95 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h
index d5ffe74736..ab0a0627be 100644
--- a/library/cpp/grpc/client/grpc_client_low.h
+++ b/library/cpp/grpc/client/grpc_client_low.h
@@ -126,7 +126,7 @@ public:
// Represents grpc status and error message string
struct TGrpcStatus {
TString Msg;
- TString Details;
+ TString Details;
int GRpcStatusCode;
bool InternalError;
@@ -141,20 +141,20 @@ struct TGrpcStatus {
, InternalError(internalError)
{ }
- TGrpcStatus(grpc::StatusCode status, TString msg, TString details = {})
+ TGrpcStatus(grpc::StatusCode status, TString msg, TString details = {})
: Msg(std::move(msg))
- , Details(std::move(details))
+ , Details(std::move(details))
, GRpcStatusCode(status)
, InternalError(false)
{ }
TGrpcStatus(const grpc::Status& status)
- : TGrpcStatus(status.error_code(), TString(status.error_message()), TString(status.error_details()))
+ : TGrpcStatus(status.error_code(), TString(status.error_message()), TString(status.error_details()))
{ }
TGrpcStatus& operator=(const grpc::Status& status) {
Msg = TString(status.error_message());
- Details = TString(status.error_details());
+ Details = TString(status.error_details());
GRpcStatusCode = status.error_code();
InternalError = false;
return *this;
@@ -178,9 +178,9 @@ bool inline IsGRpcStatusGood(const TGrpcStatus& status) {
template<typename TResponse>
using TResponseCallback = std::function<void (TGrpcStatus&&, TResponse&&)>;
-template<typename TResponse>
-using TAdvancedResponseCallback = std::function<void (const grpc::ClientContext&, TGrpcStatus&&, TResponse&&)>;
-
+template<typename TResponse>
+using TAdvancedResponseCallback = std::function<void (const grpc::ClientContext&, TGrpcStatus&&, TResponse&&)>;
+
// Call associated metadata
struct TCallMeta {
std::shared_ptr<grpc::CallCredentials> CallCredentials;
@@ -305,86 +305,86 @@ private:
bool Replied_ = false;
};
-template<typename TStub, typename TRequest, typename TResponse>
-class TAdvancedRequestProcessor
- : public TThrRefBase
- , public IQueueClientEvent
- , public TGRpcRequestProcessorCommon {
- using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncResponseReader<TResponse>>;
- template<typename> friend class TServiceConnection;
-public:
- using TPtr = TIntrusivePtr<TAdvancedRequestProcessor>;
- using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*);
-
+template<typename TStub, typename TRequest, typename TResponse>
+class TAdvancedRequestProcessor
+ : public TThrRefBase
+ , public IQueueClientEvent
+ , public TGRpcRequestProcessorCommon {
+ using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncResponseReader<TResponse>>;
+ template<typename> friend class TServiceConnection;
+public:
+ using TPtr = TIntrusivePtr<TAdvancedRequestProcessor>;
+ using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*);
+
explicit TAdvancedRequestProcessor(TAdvancedResponseCallback<TResponse>&& callback)
: Callback_(std::move(callback))
- { }
-
- ~TAdvancedRequestProcessor() {
- if (!Replied_ && Callback_) {
- Callback_(Context, TGrpcStatus::Internal("request left unhandled"), std::move(Reply_));
- Callback_ = nullptr; // free resources as early as possible
- }
- }
-
- bool Execute(bool ok) override {
+ { }
+
+ ~TAdvancedRequestProcessor() {
+ if (!Replied_ && Callback_) {
+ Callback_(Context, TGrpcStatus::Internal("request left unhandled"), std::move(Reply_));
+ Callback_ = nullptr; // free resources as early as possible
+ }
+ }
+
+ bool Execute(bool ok) override {
{
std::unique_lock<std::mutex> guard(Mutex_);
- LocalContext.reset();
- }
- TGrpcStatus status;
- if (ok) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- Replied_ = true;
- Callback_(Context, std::move(status), std::move(Reply_));
- Callback_ = nullptr; // free resources as early as possible
- return false;
- }
-
- void Destroy() override {
- UnRef();
- }
-
-private:
- IQueueClientEvent* FinishedEvent() {
- Ref();
- return this;
- }
-
- void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) {
- auto context = provider->CreateContext();
- if (!context) {
- Replied_ = true;
- Callback_(Context, TGrpcStatus(grpc::StatusCode::CANCELLED, "Client is shutting down"), std::move(Reply_));
- Callback_ = nullptr;
- return;
- }
+ LocalContext.reset();
+ }
+ TGrpcStatus status;
+ if (ok) {
+ status = Status;
+ } else {
+ status = TGrpcStatus::Internal("Unexpected error");
+ }
+ Replied_ = true;
+ Callback_(Context, std::move(status), std::move(Reply_));
+ Callback_ = nullptr; // free resources as early as possible
+ return false;
+ }
+
+ void Destroy() override {
+ UnRef();
+ }
+
+private:
+ IQueueClientEvent* FinishedEvent() {
+ Ref();
+ return this;
+ }
+
+ void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) {
+ auto context = provider->CreateContext();
+ if (!context) {
+ Replied_ = true;
+ Callback_(Context, TGrpcStatus(grpc::StatusCode::CANCELLED, "Client is shutting down"), std::move(Reply_));
+ Callback_ = nullptr;
+ return;
+ }
{
std::unique_lock<std::mutex> guard(Mutex_);
- LocalContext = context;
- Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue());
- Reader_->Finish(&Reply_, &Status, FinishedEvent());
- }
- context->SubscribeStop([self = TPtr(this)] {
- self->Stop();
- });
- }
-
- void Stop() {
- Context.TryCancel();
- }
-
- TAdvancedResponseCallback<TResponse> Callback_;
- TResponse Reply_;
+ LocalContext = context;
+ Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue());
+ Reader_->Finish(&Reply_, &Status, FinishedEvent());
+ }
+ context->SubscribeStop([self = TPtr(this)] {
+ self->Stop();
+ });
+ }
+
+ void Stop() {
+ Context.TryCancel();
+ }
+
+ TAdvancedResponseCallback<TResponse> Callback_;
+ TResponse Reply_;
std::mutex Mutex_;
- TAsyncReaderPtr Reader_;
-
- bool Replied_ = false;
-};
-
+ TAsyncReaderPtr Reader_;
+
+ bool Replied_ = false;
+};
+
template<class TResponse>
class IStreamRequestReadProcessor : public TThrRefBase {
public:
@@ -1255,21 +1255,21 @@ public:
}
/*
- * Start simple request
- */
- template<typename TRequest, typename TResponse>
- void DoAdvancedRequest(const TRequest& request,
+ * Start simple request
+ */
+ template<typename TRequest, typename TResponse>
+ void DoAdvancedRequest(const TRequest& request,
TAdvancedResponseCallback<TResponse> callback,
- typename TAdvancedRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
- const TCallMeta& metas = { },
- IQueueClientContextProvider* provider = nullptr)
- {
- auto processor = MakeIntrusive<TAdvancedRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback));
- processor->ApplyMeta(metas);
- processor->Start(*Stub_, asyncRequest, request, provider ? provider : Provider_);
- }
-
- /*
+ typename TAdvancedRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
+ const TCallMeta& metas = { },
+ IQueueClientContextProvider* provider = nullptr)
+ {
+ auto processor = MakeIntrusive<TAdvancedRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback));
+ processor->ApplyMeta(metas);
+ processor->Start(*Stub_, asyncRequest, request, provider ? provider : Provider_);
+ }
+
+ /*
* Start bidirectional streamming
*/
template<typename TRequest, typename TResponse>