diff options
author | Sergey Polovko <sergey@polovko.me> | 2022-02-10 16:47:02 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:02 +0300 |
commit | 3e0b762a82514bac89c1dd6ea7211e381d8aa248 (patch) | |
tree | c2d1b379ecaf05ca8f11ed0b5da9d1a950e6e554 /library/cpp/grpc/server/grpc_request.h | |
parent | ab3783171cc30e262243a0227c86118f7080c896 (diff) | |
download | ydb-3e0b762a82514bac89c1dd6ea7211e381d8aa248.tar.gz |
Restoring authorship annotation for Sergey Polovko <sergey@polovko.me>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/grpc/server/grpc_request.h')
-rw-r--r-- | library/cpp/grpc/server/grpc_request.h | 92 |
1 files changed, 46 insertions, 46 deletions
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h index 5bd8d3902b..dd9041eec7 100644 --- a/library/cpp/grpc/server/grpc_request.h +++ b/library/cpp/grpc/server/grpc_request.h @@ -4,19 +4,19 @@ #include <google/protobuf/arena.h> #include <google/protobuf/message.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/logger/priority.h> -#include "grpc_response.h" +#include "grpc_response.h" #include "event_callback.h" -#include "grpc_async_ctx_base.h" -#include "grpc_counters.h" +#include "grpc_async_ctx_base.h" +#include "grpc_counters.h" #include "grpc_request_base.h" #include "grpc_server.h" -#include "logger.h" - -#include <util/system/hp_timer.h> +#include "logger.h" +#include <util/system/hp_timer.h> + #include <grpc++/server.h> #include <grpc++/server_context.h> #include <grpc++/support/async_stream.h> @@ -24,7 +24,7 @@ #include <grpc++/support/byte_buffer.h> #include <grpc++/impl/codegen/async_stream.h> -namespace NGrpc { +namespace NGrpc { class IStreamAdaptor { public: @@ -57,7 +57,7 @@ public: grpc::ServerCompletionQueue* cq, TOnRequest cb, TRequestCallback requestCallback, - const char* name, + const char* name, TLoggerPtr logger, ICounterBlockPtr counters, IGRpcRequestLimiterPtr limiter) @@ -67,10 +67,10 @@ public: , RequestCallback_(requestCallback) , StreamRequestCallback_(nullptr) , Name_(name) - , Logger_(std::move(logger)) + , Logger_(std::move(logger)) , Counters_(std::move(counters)) , RequestLimiter_(std::move(limiter)) - , Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context)) + , Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context)) , StateFunc_(&TThis::SetRequestDone) { AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false); @@ -85,7 +85,7 @@ public: grpc::ServerCompletionQueue* cq, TOnRequest cb, TStreamRequestCallback requestCallback, - const char* name, + const char* name, TLoggerPtr logger, ICounterBlockPtr counters, IGRpcRequestLimiterPtr limiter) @@ -95,7 +95,7 @@ public: , RequestCallback_(nullptr) , StreamRequestCallback_(requestCallback) , Name_(name) - , Logger_(std::move(logger)) + , Logger_(std::move(logger)) , Counters_(std::move(counters)) , RequestLimiter_(std::move(limiter)) , StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context)) @@ -157,13 +157,13 @@ public: TInstant Deadline() const override { return TBaseAsyncContext<TService>::Deadline(); - } - - TSet<TStringBuf> GetPeerMetaKeys() const override { - return TBaseAsyncContext<TService>::GetPeerMetaKeys(); - } - - TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const override { + } + + TSet<TStringBuf> GetPeerMetaKeys() const override { + return TBaseAsyncContext<TService>::GetPeerMetaKeys(); + } + + TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const override { return TBaseAsyncContext<TService>::GetPeerMetaValues(key); } @@ -233,10 +233,10 @@ private: if (!Server_->IsShuttingDown()) { if (RequestCallback_) { MakeIntrusive<TThis>( - Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); + Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); } else { MakeIntrusive<TThis>( - Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); + Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); } } } @@ -257,20 +257,20 @@ private: StateFunc_ = &TThis::SetFinishDone; ResponseSize = sz; Y_VERIFY(this->Context.c_call()); - Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); + Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); } else { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (enqueued)", this, Name_, makeResponseString().data(), this->Context.peer().c_str()); - - // because of std::function cannot hold move-only captured object - // we allocate shared object on heap to avoid message copy - auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp); - auto cb = [this, uResp = std::move(uResp), sz, &makeResponseString]() { + + // because of std::function cannot hold move-only captured object + // we allocate shared object on heap to avoid message copy + auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp); + auto cb = [this, uResp = std::move(uResp), sz, &makeResponseString]() { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (pushed to grpc)", this, Name_, makeResponseString().data(), this->Context.peer().c_str()); StateFunc_ = &TThis::NextReply; ResponseSize += sz; - StreamWriter_->Write(*uResp, GetGRpcTag()); + StreamWriter_->Write(*uResp, GetGRpcTag()); }; StreamAdaptor_->Enqueue(std::move(cb), false); } @@ -283,20 +283,20 @@ private: this->Context.peer().c_str()); StateFunc_ = &TThis::SetFinishDone; ResponseSize = sz; - Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); + Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); } else { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (enqueued)", this, Name_, this->Context.peer().c_str()); - - // because of std::function cannot hold move-only captured object - // we allocate shared object on heap to avoid buffer copy - auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp); - auto cb = [this, uResp = std::move(uResp), sz]() { + + // because of std::function cannot hold move-only captured object + // we allocate shared object on heap to avoid buffer copy + auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp); + auto cb = [this, uResp = std::move(uResp), sz]() { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)", this, Name_, this->Context.peer().c_str()); StateFunc_ = &TThis::NextReply; ResponseSize += sz; - StreamWriter_->Write(*uResp, GetGRpcTag()); + StreamWriter_->Write(*uResp, GetGRpcTag()); }; StreamAdaptor_->Enqueue(std::move(cb), false); } @@ -314,8 +314,8 @@ private: GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code); StateFunc_ = &TThis::SetFinishError; - TOut resp; - Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg), GetGRpcTag()); + TOut resp; + Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg), GetGRpcTag()); } else { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)" " (enqueued)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code); @@ -380,7 +380,7 @@ private: } auto maybeToken = GetPeerMetaValues(TStringBuf("x-ydb-auth-ticket")); if (maybeToken.empty() || maybeToken[0].empty()) { - TString db{maybeDatabase ? maybeDatabase[0] : TStringBuf{}}; + TString db{maybeDatabase ? maybeDatabase[0] : TStringBuf{}}; Counters_->CountRequestsWithoutToken(); GRPC_LOG_DEBUG(Logger_, "[%p] received request without user token " "Name# %s data# %s peer# %s database# %s", this, Name_, @@ -484,12 +484,12 @@ private: TOnRequest Cb_; TRequestCallback RequestCallback_; TStreamRequestCallback StreamRequestCallback_; - const char* const Name_; + const char* const Name_; TLoggerPtr Logger_; ICounterBlockPtr Counters_; IGRpcRequestLimiterPtr RequestLimiter_; - THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_; + THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_; THolder<grpc::ServerAsyncWriterInterface<TUniversalResponse<TOut>>> StreamWriter_; TStateFunc StateFunc_; TIn* Request_; @@ -520,10 +520,10 @@ public: typename TBase::TOnRequest cb, typename TBase::TRequestCallback requestCallback, const char* name, - TLoggerPtr logger, + TLoggerPtr logger, ICounterBlockPtr counters, IGRpcRequestLimiterPtr limiter = nullptr) - : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)} + : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)} { } @@ -533,11 +533,11 @@ public: typename TBase::TOnRequest cb, typename TBase::TStreamRequestCallback requestCallback, const char* name, - TLoggerPtr logger, + TLoggerPtr logger, ICounterBlockPtr counters) - : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), nullptr} + : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), nullptr} { } }; -} // namespace NGrpc +} // namespace NGrpc |