diff options
author | Alexander Gololobov <davenger@yandex-team.com> | 2022-02-10 16:47:38 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:38 +0300 |
commit | fccc62e9bfdce9be2fe7e0f23479da3a5512211a (patch) | |
tree | c0748b5dcbade83af788c0abfa89c0383d6b779c /library/cpp/grpc/server | |
parent | 39608cdb86363c75ce55b2b9a69841c3b71f22cf (diff) | |
download | ydb-fccc62e9bfdce9be2fe7e0f23479da3a5512211a.tar.gz |
Restoring authorship annotation for Alexander Gololobov <davenger@yandex-team.com>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/server')
-rw-r--r-- | library/cpp/grpc/server/grpc_request.h | 70 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_request_base.h | 18 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_server.h | 18 |
3 files changed, 53 insertions, 53 deletions
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h index d034f8e2b7..5bd8d3902b 100644 --- a/library/cpp/grpc/server/grpc_request.h +++ b/library/cpp/grpc/server/grpc_request.h @@ -59,8 +59,8 @@ public: TRequestCallback requestCallback, const char* name, TLoggerPtr logger, - ICounterBlockPtr counters, - IGRpcRequestLimiterPtr limiter) + ICounterBlockPtr counters, + IGRpcRequestLimiterPtr limiter) : TBaseAsyncContext<TService>(service, cq) , Server_(server) , Cb_(cb) @@ -69,7 +69,7 @@ public: , Name_(name) , Logger_(std::move(logger)) , Counters_(std::move(counters)) - , RequestLimiter_(std::move(limiter)) + , RequestLimiter_(std::move(limiter)) , Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context)) , StateFunc_(&TThis::SetRequestDone) { @@ -87,8 +87,8 @@ public: TStreamRequestCallback requestCallback, const char* name, TLoggerPtr logger, - ICounterBlockPtr counters, - IGRpcRequestLimiterPtr limiter) + ICounterBlockPtr counters, + IGRpcRequestLimiterPtr limiter) : TBaseAsyncContext<TService>(service, cq) , Server_(server) , Cb_(cb) @@ -97,7 +97,7 @@ public: , Name_(name) , Logger_(std::move(logger)) , Counters_(std::move(counters)) - , RequestLimiter_(std::move(limiter)) + , RequestLimiter_(std::move(limiter)) , StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context)) , StateFunc_(&TThis::SetRequestDone) { @@ -363,7 +363,7 @@ private: return false; } - if (IncRequest()) { + if (IncRequest()) { // Adjust counters. RequestSize = Request_->ByteSize(); Counters_->StartProcessing(RequestSize); @@ -405,7 +405,7 @@ private: if (!ok) { logCb(-1); - DecRequest(); + DecRequest(); Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus, TDuration::Seconds(RequestTimer.Passed())); return false; @@ -426,7 +426,7 @@ private: GRPC_LOG_DEBUG(Logger_, "[%p] finished request Name# %s ok# %s peer# %s", this, Name_, ok ? "true" : "false", this->Context.peer().c_str()); //PrintBackTrace(); - DecRequest(); + DecRequest(); Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus, TDuration::Seconds(RequestTimer.Passed())); return false; @@ -436,7 +436,7 @@ private: GRPC_LOG_DEBUG(Logger_, "[%p] finished request with error Name# %s ok# %s peer# %s", this, Name_, ok ? "true" : "false", this->Context.peer().c_str()); if (!SkipUpdateCountersOnError) { - DecRequest(); + DecRequest(); Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus, TDuration::Seconds(RequestTimer.Passed())); } @@ -457,28 +457,28 @@ private: } } - bool IncRequest() { - if (!Server_->IncRequest()) - return false; - - if (!RequestLimiter_) - return true; - - if (!RequestLimiter_->IncRequest()) { - Server_->DecRequest(); - return false; - } - - return true; - } - - void DecRequest() { - if (RequestLimiter_) { - RequestLimiter_->DecRequest(); - } - Server_->DecRequest(); - } - + bool IncRequest() { + if (!Server_->IncRequest()) + return false; + + if (!RequestLimiter_) + return true; + + if (!RequestLimiter_->IncRequest()) { + Server_->DecRequest(); + return false; + } + + return true; + } + + void DecRequest() { + if (RequestLimiter_) { + RequestLimiter_->DecRequest(); + } + Server_->DecRequest(); + } + using TStateFunc = bool (TThis::*)(bool); TService* Server_; TOnRequest Cb_; @@ -487,7 +487,7 @@ private: const char* const Name_; TLoggerPtr Logger_; ICounterBlockPtr Counters_; - IGRpcRequestLimiterPtr RequestLimiter_; + IGRpcRequestLimiterPtr RequestLimiter_; THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_; THolder<grpc::ServerAsyncWriterInterface<TUniversalResponse<TOut>>> StreamWriter_; @@ -521,8 +521,8 @@ public: typename TBase::TRequestCallback requestCallback, const char* name, TLoggerPtr logger, - ICounterBlockPtr counters, - IGRpcRequestLimiterPtr limiter = nullptr) + ICounterBlockPtr counters, + IGRpcRequestLimiterPtr limiter = nullptr) : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)} { } diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h index 2be84158c7..fcfce1c181 100644 --- a/library/cpp/grpc/server/grpc_request_base.h +++ b/library/cpp/grpc/server/grpc_request_base.h @@ -28,16 +28,16 @@ struct TAuthState { EAuthState State; }; - -//! An interface that may be used to limit concurrency of requests + +//! An interface that may be used to limit concurrency of requests class IGRpcRequestLimiter: public TThrRefBase { -public: - virtual bool IncRequest() = 0; - virtual void DecRequest() = 0; -}; - -using IGRpcRequestLimiterPtr = TIntrusivePtr<IGRpcRequestLimiter>; - +public: + virtual bool IncRequest() = 0; + virtual void DecRequest() = 0; +}; + +using IGRpcRequestLimiterPtr = TIntrusivePtr<IGRpcRequestLimiter>; + //! State of current request class IRequestContextBase: public TThrRefBase { public: diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h index 4ce67a1b68..d6814a90a0 100644 --- a/library/cpp/grpc/server/grpc_server.h +++ b/library/cpp/grpc/server/grpc_server.h @@ -1,8 +1,8 @@ #pragma once -#include "grpc_request_base.h" +#include "grpc_request_base.h" #include "logger.h" - + #include <library/cpp/threading/future/future.h> #include <util/generic/ptr.h> @@ -123,10 +123,10 @@ public: virtual ~ICancelableContext() = default; }; -template <class TLimit> -class TInFlightLimiterImpl { +template <class TLimit> +class TInFlightLimiterImpl { public: - explicit TInFlightLimiterImpl(const TLimit& limit) + explicit TInFlightLimiterImpl(const TLimit& limit) : Limit_(limit) {} @@ -154,13 +154,13 @@ public: } private: - const TLimit Limit_; + const TLimit Limit_; TAtomic CurInFlightReqs_ = 0; }; -using TGlobalLimiter = TInFlightLimiterImpl<i64>; - - +using TGlobalLimiter = TInFlightLimiterImpl<i64>; + + class IGRpcService: public TThrRefBase { public: virtual grpc::Service* GetService() = 0; |