diff options
author | msherbakov <msherbakov@yandex-team.ru> | 2022-02-10 16:49:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:17 +0300 |
commit | a0ffafe83b7d6229709a32fa942c71d672ac989c (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc/server | |
parent | c224a621661ddd69699f9476922eb316607ef57e (diff) | |
download | ydb-a0ffafe83b7d6229709a32fa942c71d672ac989c.tar.gz |
Restoring authorship annotation for <msherbakov@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/server')
-rw-r--r-- | library/cpp/grpc/server/grpc_counters.h | 38 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_request.h | 96 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_request_base.h | 8 |
3 files changed, 71 insertions, 71 deletions
diff --git a/library/cpp/grpc/server/grpc_counters.h b/library/cpp/grpc/server/grpc_counters.h index 387163cfea..0b6c36c84c 100644 --- a/library/cpp/grpc/server/grpc_counters.h +++ b/library/cpp/grpc/server/grpc_counters.h @@ -6,14 +6,14 @@ namespace NGrpc { -struct ICounterBlock : public TThrRefBase { - virtual void CountNotOkRequest() = 0; - virtual void CountNotOkResponse() = 0; - virtual void CountNotAuthenticated() = 0; - virtual void CountResourceExhausted() = 0; - virtual void CountRequestBytes(ui32 requestSize) = 0; - virtual void CountResponseBytes(ui32 responseSize) = 0; - virtual void StartProcessing(ui32 requestSize) = 0; +struct ICounterBlock : public TThrRefBase { + virtual void CountNotOkRequest() = 0; + virtual void CountNotOkResponse() = 0; + virtual void CountNotAuthenticated() = 0; + virtual void CountResourceExhausted() = 0; + virtual void CountRequestBytes(ui32 requestSize) = 0; + virtual void CountResponseBytes(ui32 responseSize) = 0; + virtual void StartProcessing(ui32 requestSize) = 0; virtual void FinishProcessing(ui32 requestSize, ui32 responseSize, bool ok, ui32 status, TDuration requestDuration) = 0; virtual void CountRequestsWithoutDatabase() {} virtual void CountRequestsWithoutToken() {} @@ -21,11 +21,11 @@ struct ICounterBlock : public TThrRefBase { virtual TIntrusivePtr<ICounterBlock> Clone() { return this; } virtual void UseDatabase(const TString& database) { Y_UNUSED(database); } -}; - +}; + using ICounterBlockPtr = TIntrusivePtr<ICounterBlock>; -class TCounterBlock final : public ICounterBlock { +class TCounterBlock final : public ICounterBlock { NMonitoring::TDynamicCounters::TCounterPtr TotalCounter; NMonitoring::TDynamicCounters::TCounterPtr InflyCounter; NMonitoring::TDynamicCounters::TCounterPtr NotOkRequestCounter; @@ -36,7 +36,7 @@ class TCounterBlock final : public ICounterBlock { NMonitoring::TDynamicCounters::TCounterPtr NotAuthenticated; NMonitoring::TDynamicCounters::TCounterPtr ResourceExhausted; bool Percentile = false; - NMonitoring::TPercentileTracker<4, 512, 15> RequestHistMs; + NMonitoring::TPercentileTracker<4, 512, 15> RequestHistMs; std::array<NMonitoring::TDynamicCounters::TCounterPtr, 2> GRpcStatusCounters; public: @@ -66,31 +66,31 @@ public: } } - void CountNotOkRequest() override { + void CountNotOkRequest() override { NotOkRequestCounter->Inc(); } - void CountNotOkResponse() override { + void CountNotOkResponse() override { NotOkResponseCounter->Inc(); } - void CountNotAuthenticated() override { + void CountNotAuthenticated() override { NotAuthenticated->Inc(); } - void CountResourceExhausted() override { + void CountResourceExhausted() override { ResourceExhausted->Inc(); } - void CountRequestBytes(ui32 requestSize) override { + void CountRequestBytes(ui32 requestSize) override { *RequestBytes += requestSize; } - void CountResponseBytes(ui32 responseSize) override { + void CountResponseBytes(ui32 responseSize) override { *ResponseBytes += responseSize; } - void StartProcessing(ui32 requestSize) override { + void StartProcessing(ui32 requestSize) override { TotalCounter->Inc(); InflyCounter->Inc(); *RequestBytes += requestSize; diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h index c88d941815..5bd8d3902b 100644 --- a/library/cpp/grpc/server/grpc_request.h +++ b/library/cpp/grpc/server/grpc_request.h @@ -38,13 +38,13 @@ IStreamAdaptor::TPtr CreateStreamAdaptor(); /////////////////////////////////////////////////////////////////////////////// template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter, typename TOutProtoPrinter> -class TGRpcRequestImpl +class TGRpcRequestImpl : public TBaseAsyncContext<TService> , public IQueueEvent , public IRequestContextBase { using TThis = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>; - + public: using TOnRequest = std::function<void (IRequestContextBase* ctx)>; using TRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*, @@ -52,13 +52,13 @@ public: using TStreamRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*, grpc::ServerAsyncWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*); - TGRpcRequestImpl(TService* server, + TGRpcRequestImpl(TService* server, typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq, TOnRequest cb, TRequestCallback requestCallback, const char* name, - TLoggerPtr logger, + TLoggerPtr logger, ICounterBlockPtr counters, IGRpcRequestLimiterPtr limiter) : TBaseAsyncContext<TService>(service, cq) @@ -71,22 +71,22 @@ public: , Counters_(std::move(counters)) , RequestLimiter_(std::move(limiter)) , Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context)) - , StateFunc_(&TThis::SetRequestDone) + , StateFunc_(&TThis::SetRequestDone) { AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false); Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_); Y_VERIFY(Request_); - GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_); + GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_); FinishPromise_ = NThreading::NewPromise<EFinishStatus>(); } - TGRpcRequestImpl(TService* server, + TGRpcRequestImpl(TService* server, typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq, TOnRequest cb, TStreamRequestCallback requestCallback, const char* name, - TLoggerPtr logger, + TLoggerPtr logger, ICounterBlockPtr counters, IGRpcRequestLimiterPtr limiter) : TBaseAsyncContext<TService>(service, cq) @@ -99,12 +99,12 @@ public: , Counters_(std::move(counters)) , RequestLimiter_(std::move(limiter)) , StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context)) - , StateFunc_(&TThis::SetRequestDone) + , StateFunc_(&TThis::SetRequestDone) { AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false); Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_); Y_VERIFY(Request_); - GRPC_LOG_DEBUG(Logger_, "[%p] created streaming request Name# %s", this, Name_); + GRPC_LOG_DEBUG(Logger_, "[%p] created streaming request Name# %s", this, Name_); FinishPromise_ = NThreading::NewPromise<EFinishStatus>(); StreamAdaptor_ = CreateStreamAdaptor(); } @@ -138,7 +138,7 @@ public: } } - ~TGRpcRequestImpl() { + ~TGRpcRequestImpl() { // No direct dtor call allowed Y_ASSERT(RefCount() == 0); } @@ -190,10 +190,10 @@ public: WriteByteDataOk(resp); } - void ReplyError(grpc::StatusCode code, const TString& msg) override { + void ReplyError(grpc::StatusCode code, const TString& msg) override { FinishGrpcStatus(code, msg, false); - } - + } + void ReplyUnauthenticated(const TString& in) override { const TString message = in.empty() ? TString("unauthenticated") : TString("unauthenticated, ") + in; FinishGrpcStatus(grpc::StatusCode::UNAUTHENTICATED, message, false); @@ -232,10 +232,10 @@ private: void Clone() { if (!Server_->IsShuttingDown()) { if (RequestCallback_) { - MakeIntrusive<TThis>( + MakeIntrusive<TThis>( Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); } else { - MakeIntrusive<TThis>( + MakeIntrusive<TThis>( Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); } } @@ -254,7 +254,7 @@ private: if (Writer_) { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s", this, Name_, makeResponseString().data(), this->Context.peer().c_str()); - StateFunc_ = &TThis::SetFinishDone; + StateFunc_ = &TThis::SetFinishDone; ResponseSize = sz; Y_VERIFY(this->Context.c_call()); Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); @@ -281,7 +281,7 @@ private: if (Writer_) { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_, this->Context.peer().c_str()); - StateFunc_ = &TThis::SetFinishDone; + StateFunc_ = &TThis::SetFinishDone; ResponseSize = sz; Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); } else { @@ -342,7 +342,7 @@ private: } return resp; }; - GRPC_LOG_DEBUG(Logger_, "[%p] received request Name# %s ok# %s data# %s peer# %s", this, Name_, + GRPC_LOG_DEBUG(Logger_, "[%p] received request Name# %s ok# %s data# %s peer# %s", this, Name_, ok ? "true" : "false", makeRequestString().data(), this->Context.peer().c_str()); if (this->Context.c_call() == nullptr) { @@ -352,7 +352,7 @@ private: } else if (!(RequestRegistered_ = Server_->RegisterRequestCtx(this))) { // Request cannot be registered due to shutdown // It's unsafe to continue, so drop this request without processing - GRPC_LOG_DEBUG(Logger_, "[%p] dropping request Name# %s due to shutdown", this, Name_); + GRPC_LOG_DEBUG(Logger_, "[%p] dropping request Name# %s due to shutdown", this, Name_); this->Context.TryCancel(); return false; } @@ -423,7 +423,7 @@ private: } bool SetFinishDone(bool ok) { - GRPC_LOG_DEBUG(Logger_, "[%p] finished request Name# %s ok# %s peer# %s", this, Name_, + GRPC_LOG_DEBUG(Logger_, "[%p] finished request Name# %s ok# %s peer# %s", this, Name_, ok ? "true" : "false", this->Context.peer().c_str()); //PrintBackTrace(); DecRequest(); @@ -433,7 +433,7 @@ private: } bool SetFinishError(bool ok) { - GRPC_LOG_DEBUG(Logger_, "[%p] finished request with error Name# %s ok# %s peer# %s", this, Name_, + GRPC_LOG_DEBUG(Logger_, "[%p] finished request with error Name# %s ok# %s peer# %s", this, Name_, ok ? "true" : "false", this->Context.peer().c_str()); if (!SkipUpdateCountersOnError) { DecRequest(); @@ -479,14 +479,14 @@ private: Server_->DecRequest(); } - using TStateFunc = bool (TThis::*)(bool); + using TStateFunc = bool (TThis::*)(bool); TService* Server_; TOnRequest Cb_; TRequestCallback RequestCallback_; TStreamRequestCallback StreamRequestCallback_; const char* const Name_; - TLoggerPtr Logger_; - ICounterBlockPtr Counters_; + TLoggerPtr Logger_; + ICounterBlockPtr Counters_; IGRpcRequestLimiterPtr RequestLimiter_; THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_; @@ -503,8 +503,8 @@ private: TAuthState AuthState_ = 0; bool RequestRegistered_ = false; - using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>; - TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish }; + using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>; + TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish }; NThreading::TPromise<EFinishStatus> FinishPromise_; bool SkipUpdateCountersOnError = false; IStreamAdaptor::TPtr StreamAdaptor_; @@ -513,31 +513,31 @@ private: template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter=google::protobuf::TextFormat::Printer, typename TOutProtoPrinter=google::protobuf::TextFormat::Printer> class TGRpcRequest: public TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter> { using TBase = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>; -public: - TGRpcRequest(TService* server, - typename TService::TCurrentGRpcService::AsyncService* service, - grpc::ServerCompletionQueue* cq, - typename TBase::TOnRequest cb, - typename TBase::TRequestCallback requestCallback, - const char* name, +public: + TGRpcRequest(TService* server, + typename TService::TCurrentGRpcService::AsyncService* service, + grpc::ServerCompletionQueue* cq, + typename TBase::TOnRequest cb, + typename TBase::TRequestCallback requestCallback, + const char* name, TLoggerPtr logger, ICounterBlockPtr counters, IGRpcRequestLimiterPtr limiter = nullptr) : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)} - { - } - - TGRpcRequest(TService* server, - typename TService::TCurrentGRpcService::AsyncService* service, - grpc::ServerCompletionQueue* cq, - typename TBase::TOnRequest cb, - typename TBase::TStreamRequestCallback requestCallback, - const char* name, + { + } + + TGRpcRequest(TService* server, + typename TService::TCurrentGRpcService::AsyncService* service, + grpc::ServerCompletionQueue* cq, + typename TBase::TOnRequest cb, + typename TBase::TStreamRequestCallback requestCallback, + const char* name, TLoggerPtr logger, - ICounterBlockPtr counters) + ICounterBlockPtr counters) : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), nullptr} - { - } -}; - + { + } +}; + } // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h index acf3871569..fcfce1c181 100644 --- a/library/cpp/grpc/server/grpc_request_base.h +++ b/library/cpp/grpc/server/grpc_request_base.h @@ -60,16 +60,16 @@ public: //! Implementation can swap protobuf message virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0) = 0; - //! Send serialised response (The request shoult be created for bytes response type) + //! Send serialised response (The request shoult be created for bytes response type) //! Implementation can swap ByteBuffer virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0) = 0; //! Send grpc UNAUTHENTICATED status virtual void ReplyUnauthenticated(const TString& in) = 0; - //! Send grpc error - virtual void ReplyError(grpc::StatusCode code, const TString& msg) = 0; - + //! Send grpc error + virtual void ReplyError(grpc::StatusCode code, const TString& msg) = 0; + //! Returns deadline (server epoch related) if peer set it on its side, or Instanse::Max() otherwise virtual TInstant Deadline() const = 0; |