diff options
author | msherbakov <msherbakov@yandex-team.ru> | 2022-02-10 16:49:16 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:16 +0300 |
commit | c224a621661ddd69699f9476922eb316607ef57e (patch) | |
tree | 33f4d878aa0a9faa964005e06bfab0272313aa71 /library/cpp/grpc | |
parent | 29d0b2eeae154d04156e0698067c0c21a97ea61d (diff) | |
download | ydb-c224a621661ddd69699f9476922eb316607ef57e.tar.gz |
Restoring authorship annotation for <msherbakov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/grpc')
-rw-r--r-- | library/cpp/grpc/client/grpc_client_low.cpp | 4 | ||||
-rw-r--r-- | library/cpp/grpc/client/grpc_client_low.h | 4 | ||||
-rw-r--r-- | library/cpp/grpc/client/grpc_common.h | 28 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_counters.h | 38 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_request.h | 96 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_request_base.h | 8 | ||||
-rw-r--r-- | library/cpp/grpc/ya.make | 6 |
7 files changed, 92 insertions, 92 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp index 73cc908ef8..b2d0ea6eb2 100644 --- a/library/cpp/grpc/client/grpc_client_low.cpp +++ b/library/cpp/grpc/client/grpc_client_low.cpp @@ -15,7 +15,7 @@ #include <netinet/tcp.h> #endif -namespace NGrpc { +namespace NGrpc { void EnableGRpcTracing() { grpc_tracer_set_enabled("tcp", true); @@ -583,4 +583,4 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) { } } -} // namespace NGRpc +} // namespace NGRpc diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h index ab0a0627be..e2a80e624e 100644 --- a/library/cpp/grpc/client/grpc_client_low.h +++ b/library/cpp/grpc/client/grpc_client_low.h @@ -21,7 +21,7 @@ * This file contains low level logic for grpc * This file should not be used in high level code without special reason */ -namespace NGrpc { +namespace NGrpc { const size_t DEFAULT_NUM_THREADS = 2; @@ -1396,4 +1396,4 @@ private: std::mutex JoinMutex_; }; -} // namespace NGRpc +} // namespace NGRpc diff --git a/library/cpp/grpc/client/grpc_common.h b/library/cpp/grpc/client/grpc_common.h index ffcdafe045..12a3f7c28e 100644 --- a/library/cpp/grpc/client/grpc_common.h +++ b/library/cpp/grpc/client/grpc_common.h @@ -1,7 +1,7 @@ #pragma once #include <grpc++/grpc++.h> -#include <grpc++/resource_quota.h> +#include <grpc++/resource_quota.h> #include <util/datetime/base.h> #include <unordered_map> @@ -9,19 +9,19 @@ constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000; -namespace NGrpc { +namespace NGrpc { struct TGRpcClientConfig { TString Locator; // format host:port TDuration Timeout = TDuration::Max(); // request timeout ui64 MaxMessageSize = DEFAULT_GRPC_MESSAGE_SIZE_LIMIT; // Max request and response size - ui64 MaxInboundMessageSize = 0; // overrides MaxMessageSize for incoming requests - ui64 MaxOutboundMessageSize = 0; // overrides MaxMessageSize for outgoing requests + ui64 MaxInboundMessageSize = 0; // overrides MaxMessageSize for incoming requests + ui64 MaxOutboundMessageSize = 0; // overrides MaxMessageSize for outgoing requests ui32 MaxInFlight = 0; bool EnableSsl = false; TString SslCaCert; //Implicitly enables Ssl if not empty grpc_compression_algorithm CompressionAlgoritm = GRPC_COMPRESS_NONE; - ui64 MemQuota = 0; + ui64 MemQuota = 0; std::unordered_map<TString, TString> StringChannelParams; std::unordered_map<TString, int> IntChannelParams; TString LoadBalancingPolicy = { }; @@ -48,10 +48,10 @@ struct TGRpcClientConfig { inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRpcClientConfig& config, grpc_socket_mutator* mutator = nullptr){ grpc::ChannelArguments args; - args.SetMaxReceiveMessageSize(config.MaxInboundMessageSize ? config.MaxInboundMessageSize : config.MaxMessageSize); - args.SetMaxSendMessageSize(config.MaxOutboundMessageSize ? config.MaxOutboundMessageSize : config.MaxMessageSize); + args.SetMaxReceiveMessageSize(config.MaxInboundMessageSize ? config.MaxInboundMessageSize : config.MaxMessageSize); + args.SetMaxSendMessageSize(config.MaxOutboundMessageSize ? config.MaxOutboundMessageSize : config.MaxMessageSize); args.SetCompressionAlgorithm(config.CompressionAlgoritm); - + for (const auto& kvp: config.StringChannelParams) { args.SetString(kvp.first, kvp.second); } @@ -60,11 +60,11 @@ inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRp args.SetInt(kvp.first, kvp.second); } - if (config.MemQuota) { - grpc::ResourceQuota quota; - quota.Resize(config.MemQuota); - args.SetResourceQuota(quota); - } + if (config.MemQuota) { + grpc::ResourceQuota quota; + quota.Resize(config.MemQuota); + args.SetResourceQuota(quota); + } if (mutator) { args.SetSocketMutator(mutator); } @@ -81,4 +81,4 @@ inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRp } } -} // namespace NGRpc +} // namespace NGRpc diff --git a/library/cpp/grpc/server/grpc_counters.h b/library/cpp/grpc/server/grpc_counters.h index 0b6c36c84c..387163cfea 100644 --- a/library/cpp/grpc/server/grpc_counters.h +++ b/library/cpp/grpc/server/grpc_counters.h @@ -6,14 +6,14 @@ namespace NGrpc { -struct ICounterBlock : public TThrRefBase { - virtual void CountNotOkRequest() = 0; - virtual void CountNotOkResponse() = 0; - virtual void CountNotAuthenticated() = 0; - virtual void CountResourceExhausted() = 0; - virtual void CountRequestBytes(ui32 requestSize) = 0; - virtual void CountResponseBytes(ui32 responseSize) = 0; - virtual void StartProcessing(ui32 requestSize) = 0; +struct ICounterBlock : public TThrRefBase { + virtual void CountNotOkRequest() = 0; + virtual void CountNotOkResponse() = 0; + virtual void CountNotAuthenticated() = 0; + virtual void CountResourceExhausted() = 0; + virtual void CountRequestBytes(ui32 requestSize) = 0; + virtual void CountResponseBytes(ui32 responseSize) = 0; + virtual void StartProcessing(ui32 requestSize) = 0; virtual void FinishProcessing(ui32 requestSize, ui32 responseSize, bool ok, ui32 status, TDuration requestDuration) = 0; virtual void CountRequestsWithoutDatabase() {} virtual void CountRequestsWithoutToken() {} @@ -21,11 +21,11 @@ struct ICounterBlock : public TThrRefBase { virtual TIntrusivePtr<ICounterBlock> Clone() { return this; } virtual void UseDatabase(const TString& database) { Y_UNUSED(database); } -}; - +}; + using ICounterBlockPtr = TIntrusivePtr<ICounterBlock>; -class TCounterBlock final : public ICounterBlock { +class TCounterBlock final : public ICounterBlock { NMonitoring::TDynamicCounters::TCounterPtr TotalCounter; NMonitoring::TDynamicCounters::TCounterPtr InflyCounter; NMonitoring::TDynamicCounters::TCounterPtr NotOkRequestCounter; @@ -36,7 +36,7 @@ class TCounterBlock final : public ICounterBlock { NMonitoring::TDynamicCounters::TCounterPtr NotAuthenticated; NMonitoring::TDynamicCounters::TCounterPtr ResourceExhausted; bool Percentile = false; - NMonitoring::TPercentileTracker<4, 512, 15> RequestHistMs; + NMonitoring::TPercentileTracker<4, 512, 15> RequestHistMs; std::array<NMonitoring::TDynamicCounters::TCounterPtr, 2> GRpcStatusCounters; public: @@ -66,31 +66,31 @@ public: } } - void CountNotOkRequest() override { + void CountNotOkRequest() override { NotOkRequestCounter->Inc(); } - void CountNotOkResponse() override { + void CountNotOkResponse() override { NotOkResponseCounter->Inc(); } - void CountNotAuthenticated() override { + void CountNotAuthenticated() override { NotAuthenticated->Inc(); } - void CountResourceExhausted() override { + void CountResourceExhausted() override { ResourceExhausted->Inc(); } - void CountRequestBytes(ui32 requestSize) override { + void CountRequestBytes(ui32 requestSize) override { *RequestBytes += requestSize; } - void CountResponseBytes(ui32 responseSize) override { + void CountResponseBytes(ui32 responseSize) override { *ResponseBytes += responseSize; } - void StartProcessing(ui32 requestSize) override { + void StartProcessing(ui32 requestSize) override { TotalCounter->Inc(); InflyCounter->Inc(); *RequestBytes += requestSize; diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h index 5bd8d3902b..c88d941815 100644 --- a/library/cpp/grpc/server/grpc_request.h +++ b/library/cpp/grpc/server/grpc_request.h @@ -38,13 +38,13 @@ IStreamAdaptor::TPtr CreateStreamAdaptor(); /////////////////////////////////////////////////////////////////////////////// template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter, typename TOutProtoPrinter> -class TGRpcRequestImpl +class TGRpcRequestImpl : public TBaseAsyncContext<TService> , public IQueueEvent , public IRequestContextBase { using TThis = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>; - + public: using TOnRequest = std::function<void (IRequestContextBase* ctx)>; using TRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*, @@ -52,13 +52,13 @@ public: using TStreamRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*, grpc::ServerAsyncWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*); - TGRpcRequestImpl(TService* server, + TGRpcRequestImpl(TService* server, typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq, TOnRequest cb, TRequestCallback requestCallback, const char* name, - TLoggerPtr logger, + TLoggerPtr logger, ICounterBlockPtr counters, IGRpcRequestLimiterPtr limiter) : TBaseAsyncContext<TService>(service, cq) @@ -71,22 +71,22 @@ public: , Counters_(std::move(counters)) , RequestLimiter_(std::move(limiter)) , Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context)) - , StateFunc_(&TThis::SetRequestDone) + , StateFunc_(&TThis::SetRequestDone) { AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false); Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_); Y_VERIFY(Request_); - GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_); + GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_); FinishPromise_ = NThreading::NewPromise<EFinishStatus>(); } - TGRpcRequestImpl(TService* server, + TGRpcRequestImpl(TService* server, typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq, TOnRequest cb, TStreamRequestCallback requestCallback, const char* name, - TLoggerPtr logger, + TLoggerPtr logger, ICounterBlockPtr counters, IGRpcRequestLimiterPtr limiter) : TBaseAsyncContext<TService>(service, cq) @@ -99,12 +99,12 @@ public: , Counters_(std::move(counters)) , RequestLimiter_(std::move(limiter)) , StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context)) - , StateFunc_(&TThis::SetRequestDone) + , StateFunc_(&TThis::SetRequestDone) { AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false); Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_); Y_VERIFY(Request_); - GRPC_LOG_DEBUG(Logger_, "[%p] created streaming request Name# %s", this, Name_); + GRPC_LOG_DEBUG(Logger_, "[%p] created streaming request Name# %s", this, Name_); FinishPromise_ = NThreading::NewPromise<EFinishStatus>(); StreamAdaptor_ = CreateStreamAdaptor(); } @@ -138,7 +138,7 @@ public: } } - ~TGRpcRequestImpl() { + ~TGRpcRequestImpl() { // No direct dtor call allowed Y_ASSERT(RefCount() == 0); } @@ -190,10 +190,10 @@ public: WriteByteDataOk(resp); } - void ReplyError(grpc::StatusCode code, const TString& msg) override { + void ReplyError(grpc::StatusCode code, const TString& msg) override { FinishGrpcStatus(code, msg, false); - } - + } + void ReplyUnauthenticated(const TString& in) override { const TString message = in.empty() ? TString("unauthenticated") : TString("unauthenticated, ") + in; FinishGrpcStatus(grpc::StatusCode::UNAUTHENTICATED, message, false); @@ -232,10 +232,10 @@ private: void Clone() { if (!Server_->IsShuttingDown()) { if (RequestCallback_) { - MakeIntrusive<TThis>( + MakeIntrusive<TThis>( Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); } else { - MakeIntrusive<TThis>( + MakeIntrusive<TThis>( Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); } } @@ -254,7 +254,7 @@ private: if (Writer_) { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s", this, Name_, makeResponseString().data(), this->Context.peer().c_str()); - StateFunc_ = &TThis::SetFinishDone; + StateFunc_ = &TThis::SetFinishDone; ResponseSize = sz; Y_VERIFY(this->Context.c_call()); Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); @@ -281,7 +281,7 @@ private: if (Writer_) { GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_, this->Context.peer().c_str()); - StateFunc_ = &TThis::SetFinishDone; + StateFunc_ = &TThis::SetFinishDone; ResponseSize = sz; Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); } else { @@ -342,7 +342,7 @@ private: } return resp; }; - GRPC_LOG_DEBUG(Logger_, "[%p] received request Name# %s ok# %s data# %s peer# %s", this, Name_, + GRPC_LOG_DEBUG(Logger_, "[%p] received request Name# %s ok# %s data# %s peer# %s", this, Name_, ok ? "true" : "false", makeRequestString().data(), this->Context.peer().c_str()); if (this->Context.c_call() == nullptr) { @@ -352,7 +352,7 @@ private: } else if (!(RequestRegistered_ = Server_->RegisterRequestCtx(this))) { // Request cannot be registered due to shutdown // It's unsafe to continue, so drop this request without processing - GRPC_LOG_DEBUG(Logger_, "[%p] dropping request Name# %s due to shutdown", this, Name_); + GRPC_LOG_DEBUG(Logger_, "[%p] dropping request Name# %s due to shutdown", this, Name_); this->Context.TryCancel(); return false; } @@ -423,7 +423,7 @@ private: } bool SetFinishDone(bool ok) { - GRPC_LOG_DEBUG(Logger_, "[%p] finished request Name# %s ok# %s peer# %s", this, Name_, + GRPC_LOG_DEBUG(Logger_, "[%p] finished request Name# %s ok# %s peer# %s", this, Name_, ok ? "true" : "false", this->Context.peer().c_str()); //PrintBackTrace(); DecRequest(); @@ -433,7 +433,7 @@ private: } bool SetFinishError(bool ok) { - GRPC_LOG_DEBUG(Logger_, "[%p] finished request with error Name# %s ok# %s peer# %s", this, Name_, + GRPC_LOG_DEBUG(Logger_, "[%p] finished request with error Name# %s ok# %s peer# %s", this, Name_, ok ? "true" : "false", this->Context.peer().c_str()); if (!SkipUpdateCountersOnError) { DecRequest(); @@ -479,14 +479,14 @@ private: Server_->DecRequest(); } - using TStateFunc = bool (TThis::*)(bool); + using TStateFunc = bool (TThis::*)(bool); TService* Server_; TOnRequest Cb_; TRequestCallback RequestCallback_; TStreamRequestCallback StreamRequestCallback_; const char* const Name_; - TLoggerPtr Logger_; - ICounterBlockPtr Counters_; + TLoggerPtr Logger_; + ICounterBlockPtr Counters_; IGRpcRequestLimiterPtr RequestLimiter_; THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_; @@ -503,8 +503,8 @@ private: TAuthState AuthState_ = 0; bool RequestRegistered_ = false; - using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>; - TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish }; + using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>; + TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish }; NThreading::TPromise<EFinishStatus> FinishPromise_; bool SkipUpdateCountersOnError = false; IStreamAdaptor::TPtr StreamAdaptor_; @@ -513,31 +513,31 @@ private: template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter=google::protobuf::TextFormat::Printer, typename TOutProtoPrinter=google::protobuf::TextFormat::Printer> class TGRpcRequest: public TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter> { using TBase = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>; -public: - TGRpcRequest(TService* server, - typename TService::TCurrentGRpcService::AsyncService* service, - grpc::ServerCompletionQueue* cq, - typename TBase::TOnRequest cb, - typename TBase::TRequestCallback requestCallback, - const char* name, +public: + TGRpcRequest(TService* server, + typename TService::TCurrentGRpcService::AsyncService* service, + grpc::ServerCompletionQueue* cq, + typename TBase::TOnRequest cb, + typename TBase::TRequestCallback requestCallback, + const char* name, TLoggerPtr logger, ICounterBlockPtr counters, IGRpcRequestLimiterPtr limiter = nullptr) : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)} - { - } - - TGRpcRequest(TService* server, - typename TService::TCurrentGRpcService::AsyncService* service, - grpc::ServerCompletionQueue* cq, - typename TBase::TOnRequest cb, - typename TBase::TStreamRequestCallback requestCallback, - const char* name, + { + } + + TGRpcRequest(TService* server, + typename TService::TCurrentGRpcService::AsyncService* service, + grpc::ServerCompletionQueue* cq, + typename TBase::TOnRequest cb, + typename TBase::TStreamRequestCallback requestCallback, + const char* name, TLoggerPtr logger, - ICounterBlockPtr counters) + ICounterBlockPtr counters) : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), nullptr} - { - } -}; - + { + } +}; + } // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h index fcfce1c181..acf3871569 100644 --- a/library/cpp/grpc/server/grpc_request_base.h +++ b/library/cpp/grpc/server/grpc_request_base.h @@ -60,16 +60,16 @@ public: //! Implementation can swap protobuf message virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0) = 0; - //! Send serialised response (The request shoult be created for bytes response type) + //! Send serialised response (The request shoult be created for bytes response type) //! Implementation can swap ByteBuffer virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0) = 0; //! Send grpc UNAUTHENTICATED status virtual void ReplyUnauthenticated(const TString& in) = 0; - //! Send grpc error - virtual void ReplyError(grpc::StatusCode code, const TString& msg) = 0; - + //! Send grpc error + virtual void ReplyError(grpc::StatusCode code, const TString& msg) = 0; + //! Returns deadline (server epoch related) if peer set it on its side, or Instanse::Max() otherwise virtual TInstant Deadline() const = 0; diff --git a/library/cpp/grpc/ya.make b/library/cpp/grpc/ya.make index 3635124115..9239947ad1 100644 --- a/library/cpp/grpc/ya.make +++ b/library/cpp/grpc/ya.make @@ -1,5 +1,5 @@ -RECURSE( - client +RECURSE( + client common server -) +) |