diff options
author | msherbakov <msherbakov@yandex-team.ru> | 2022-02-10 16:49:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:17 +0300 |
commit | a0ffafe83b7d6229709a32fa942c71d672ac989c (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc/server/grpc_request.h | |
parent | c224a621661ddd69699f9476922eb316607ef57e (diff) | |
download | ydb-a0ffafe83b7d6229709a32fa942c71d672ac989c.tar.gz |
Restoring authorship annotation for <msherbakov@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/server/grpc_request.h')
-rw-r--r-- | library/cpp/grpc/server/grpc_request.h | 96 |
1 files changed, 48 insertions, 48 deletions
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h index c88d941815..5bd8d3902b 100644 --- a/library/cpp/grpc/server/grpc_request.h +++ b/library/cpp/grpc/server/grpc_request.h @@ -38,13 +38,13 @@ IStreamAdaptor::TPtr CreateStreamAdaptor(); /////////////////////////////////////////////////////////////////////////////// template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter, typename TOutProtoPrinter> -class TGRpcRequestImpl +class TGRpcRequestImpl : public TBaseAsyncContext<TService> , public IQueueEvent , public IRequestContextBase { using TThis = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>; - + public: using TOnRequest = std::function<void (IRequestContextBase* ctx)>; using TRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*, @@ -52,13 +52,13 @@ public: using TStreamRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*, grpc::ServerAsyncWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*); - TGRpcRequestImpl(TService* server, + TGRpcRequestImpl(TService* server, typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq, TOnRequest cb, TRequestCallback requestCallback, const char* name, - TLoggerPtr logger, + TLoggerPtr logger, ICounterBlockPtr counters, IGRpcRequestLimiterPtr limiter) : TBaseAsyncContext<TService>(service, cq) @@ -71,22 +71,22 @@ public: , Counters_(std::move(counters)) , RequestLimiter_(std::move(limiter)) , Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context)) - , StateFunc_(&TThis::SetRequestDone) + , StateFunc_(&TThis::SetRequestDone) { AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false); Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_); Y_VERIFY(Request_); - GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_); + GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_); FinishPromise_ = NThreading::NewPromise<EFinishStatus>(); } - TGRpcRequestImpl(TService* server, + TGRpcRequestImpl(TService* server, typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq, TOnRequest cb, TStreamRequestCallback requestCallback, const char* name, - TLoggerPtr logger, + TLoggerPtr logger, ICounterBlockPtr counters, IGRpcRequestLimiterPtr limiter) : TBaseAsyncContext<TService>(service, cq) @@ -99,12 +99,12 @@ public: , Counters_(std::move(counters)) , RequestLimiter_(std::move(limiter)) , StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context)) - , StateFunc_(&TThis::SetRequestDone) + , StateFunc_(&TThis::SetRequestDone) { AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false); Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_); Y_VERIFY(Request_); - GRPC_LOG_DEBUG(Logger_, "[%p] created streaming request Name# %s", this, Name_); + GRPC_LOG_DEBUG(Logger_, "[%p] created streaming request Name# %s", this, Name_); FinishPromise_ = NThreading::NewPromise<EFinishStatus>(); StreamAdaptor_ = CreateStreamAdaptor(); } @@ -138,7 +138,7 @@ public: } } - ~TGRpcRequestImpl() { + ~TGRpcRequestImpl() { // No direct dtor call allowed Y_ASSERT(RefCount() == 0); } @@ -190,10 +190,10 @@ public: WriteByteDataOk(resp); } - void ReplyError(grpc::StatusCode code, const TString& msg) override { + void ReplyError(grpc::StatusCode code, const TString& msg) override { FinishGrpcStatus(code, msg, false); - } - + } + void ReplyUnauthenticated(const TString& in) override { const TString message = in.empty() ? TString("unauthenticated") : TString("unauthenticated, ") + in; FinishGrpcStatus(grpc::StatusCode::UNAUTHENTICATED, message, false); @@ -232,10 +232,10 @@ private: void Clone() { if (!Server_->IsShuttingDown()) { if (RequestCallback_) { - MakeIntrusive<TThis>( + MakeIntrusive<TThis>( Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); } else { - MakeIntrusive<TThis>( + MakeIntrusive<TThis>( Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); } } @@ -254,7 +254,7 @@ private: if (Writer_) { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s", this, Name_, makeResponseString().data(), this->Context.peer().c_str()); - StateFunc_ = &TThis::SetFinishDone; + StateFunc_ = &TThis::SetFinishDone; ResponseSize = sz; Y_VERIFY(this->Context.c_call()); Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); @@ -281,7 +281,7 @@ private: if (Writer_) { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_, this->Context.peer().c_str()); - StateFunc_ = &TThis::SetFinishDone; + StateFunc_ = &TThis::SetFinishDone; ResponseSize = sz; Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); } else { @@ -342,7 +342,7 @@ private: } return resp; }; - GRPC_LOG_DEBUG(Logger_, "[%p] received request Name# %s ok# %s data# %s peer# %s", this, Name_, + GRPC_LOG_DEBUG(Logger_, "[%p] received request Name# %s ok# %s data# %s peer# %s", this, Name_, ok ? "true" : "false", makeRequestString().data(), this->Context.peer().c_str()); if (this->Context.c_call() == nullptr) { @@ -352,7 +352,7 @@ private: } else if (!(RequestRegistered_ = Server_->RegisterRequestCtx(this))) { // Request cannot be registered due to shutdown // It's unsafe to continue, so drop this request without processing - GRPC_LOG_DEBUG(Logger_, "[%p] dropping request Name# %s due to shutdown", this, Name_); + GRPC_LOG_DEBUG(Logger_, "[%p] dropping request Name# %s due to shutdown", this, Name_); this->Context.TryCancel(); return false; } @@ -423,7 +423,7 @@ private: } bool SetFinishDone(bool ok) { - GRPC_LOG_DEBUG(Logger_, "[%p] finished request Name# %s ok# %s peer# %s", this, Name_, + GRPC_LOG_DEBUG(Logger_, "[%p] finished request Name# %s ok# %s peer# %s", this, Name_, ok ? "true" : "false", this->Context.peer().c_str()); //PrintBackTrace(); DecRequest(); @@ -433,7 +433,7 @@ private: } bool SetFinishError(bool ok) { - GRPC_LOG_DEBUG(Logger_, "[%p] finished request with error Name# %s ok# %s peer# %s", this, Name_, + GRPC_LOG_DEBUG(Logger_, "[%p] finished request with error Name# %s ok# %s peer# %s", this, Name_, ok ? "true" : "false", this->Context.peer().c_str()); if (!SkipUpdateCountersOnError) { DecRequest(); @@ -479,14 +479,14 @@ private: Server_->DecRequest(); } - using TStateFunc = bool (TThis::*)(bool); + using TStateFunc = bool (TThis::*)(bool); TService* Server_; TOnRequest Cb_; TRequestCallback RequestCallback_; TStreamRequestCallback StreamRequestCallback_; const char* const Name_; - TLoggerPtr Logger_; - ICounterBlockPtr Counters_; + TLoggerPtr Logger_; + ICounterBlockPtr Counters_; IGRpcRequestLimiterPtr RequestLimiter_; THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_; @@ -503,8 +503,8 @@ private: TAuthState AuthState_ = 0; bool RequestRegistered_ = false; - using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>; - TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish }; + using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>; + TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish }; NThreading::TPromise<EFinishStatus> FinishPromise_; bool SkipUpdateCountersOnError = false; IStreamAdaptor::TPtr StreamAdaptor_; @@ -513,31 +513,31 @@ private: template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter=google::protobuf::TextFormat::Printer, typename TOutProtoPrinter=google::protobuf::TextFormat::Printer> class TGRpcRequest: public TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter> { using TBase = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>; -public: - TGRpcRequest(TService* server, - typename TService::TCurrentGRpcService::AsyncService* service, - grpc::ServerCompletionQueue* cq, - typename TBase::TOnRequest cb, - typename TBase::TRequestCallback requestCallback, - const char* name, +public: + TGRpcRequest(TService* server, + typename TService::TCurrentGRpcService::AsyncService* service, + grpc::ServerCompletionQueue* cq, + typename TBase::TOnRequest cb, + typename TBase::TRequestCallback requestCallback, + const char* name, TLoggerPtr logger, ICounterBlockPtr counters, IGRpcRequestLimiterPtr limiter = nullptr) : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)} - { - } - - TGRpcRequest(TService* server, - typename TService::TCurrentGRpcService::AsyncService* service, - grpc::ServerCompletionQueue* cq, - typename TBase::TOnRequest cb, - typename TBase::TStreamRequestCallback requestCallback, - const char* name, + { + } + + TGRpcRequest(TService* server, + typename TService::TCurrentGRpcService::AsyncService* service, + grpc::ServerCompletionQueue* cq, + typename TBase::TOnRequest cb, + typename TBase::TStreamRequestCallback requestCallback, + const char* name, TLoggerPtr logger, - ICounterBlockPtr counters) + ICounterBlockPtr counters) : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), nullptr} - { - } -}; - + { + } +}; + } // namespace NGrpc |