diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/grpc/server | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/grpc/server')
19 files changed, 2127 insertions, 0 deletions
diff --git a/library/cpp/grpc/server/actors/logger.cpp b/library/cpp/grpc/server/actors/logger.cpp new file mode 100644 index 0000000000..d8b2042576 --- /dev/null +++ b/library/cpp/grpc/server/actors/logger.cpp @@ -0,0 +1,45 @@ +#include "logger.h" + +namespace NGrpc { +namespace { + +static_assert( + ui16(TLOG_EMERG) == ui16(NActors::NLog::PRI_EMERG) && + ui16(TLOG_DEBUG) == ui16(NActors::NLog::PRI_DEBUG), + "log levels in the library/log and library/cpp/actors don't match"); + +class TActorSystemLogger final: public TLogger { +public: + TActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component) noexcept + : ActorSystem_{as} + , Component_{component} + { + } + +protected: + bool DoIsEnabled(ELogPriority p) const noexcept override { + const auto* settings = static_cast<::NActors::NLog::TSettings*>(ActorSystem_.LoggerSettings()); + const auto priority = static_cast<::NActors::NLog::EPriority>(p); + + return settings && settings->Satisfies(priority, Component_, 0); + } + + void DoWrite(ELogPriority p, const char* format, va_list args) noexcept override { + Y_VERIFY_DEBUG(DoIsEnabled(p)); + + const auto priority = static_cast<::NActors::NLog::EPriority>(p); + ::NActors::MemLogAdapter(ActorSystem_, priority, Component_, format, args); + } + +private: + NActors::TActorSystem& ActorSystem_; + NActors::NLog::EComponent Component_; +}; + +} // namespace + +TLoggerPtr CreateActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component) { + return MakeIntrusive<TActorSystemLogger>(as, component); +} + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/actors/logger.h b/library/cpp/grpc/server/actors/logger.h new file mode 100644 index 0000000000..abf9270f7b --- /dev/null +++ b/library/cpp/grpc/server/actors/logger.h @@ -0,0 +1,11 @@ +#pragma once + +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/log.h> +#include <library/cpp/grpc/server/logger.h> + +namespace NGrpc { + +TLoggerPtr CreateActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component); + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/actors/ya.make b/library/cpp/grpc/server/actors/ya.make new file mode 100644 index 0000000000..6c9d80aa45 --- /dev/null +++ b/library/cpp/grpc/server/actors/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +OWNER(g:kikimr g:solomon) + +SRCS( + logger.cpp +) + +PEERDIR( + library/cpp/actors/core +) + +END() diff --git a/library/cpp/grpc/server/event_callback.cpp b/library/cpp/grpc/server/event_callback.cpp new file mode 100644 index 0000000000..f423836bd6 --- /dev/null +++ b/library/cpp/grpc/server/event_callback.cpp @@ -0,0 +1 @@ +#include "event_callback.h" diff --git a/library/cpp/grpc/server/event_callback.h b/library/cpp/grpc/server/event_callback.h new file mode 100644 index 0000000000..d0b700b3c9 --- /dev/null +++ b/library/cpp/grpc/server/event_callback.h @@ -0,0 +1,80 @@ +#pragma once + +#include "grpc_server.h" + +namespace NGrpc { + +enum class EQueueEventStatus { + OK, + ERROR +}; + +template<class TCallback> +class TQueueEventCallback: public IQueueEvent { +public: + TQueueEventCallback(const TCallback& callback) + : Callback(callback) + {} + + TQueueEventCallback(TCallback&& callback) + : Callback(std::move(callback)) + {} + + bool Execute(bool ok) override { + Callback(ok ? EQueueEventStatus::OK : EQueueEventStatus::ERROR); + return false; + } + + void DestroyRequest() override { + delete this; + } + +private: + TCallback Callback; +}; + +// Implementation of IQueueEvent that reduces allocations +template<class TSelf> +class TQueueFixedEvent: private IQueueEvent { + using TCallback = void (TSelf::*)(EQueueEventStatus); + +public: + TQueueFixedEvent(TSelf* self, TCallback callback) + : Self(self) + , Callback(callback) + { } + + IQueueEvent* Prepare() { + Self->Ref(); + return this; + } + +private: + bool Execute(bool ok) override { + ((*Self).*Callback)(ok ? EQueueEventStatus::OK : EQueueEventStatus::ERROR); + return false; + } + + void DestroyRequest() override { + Self->UnRef(); + } + +private: + TSelf* const Self; + TCallback const Callback; +}; + +template<class TCallback> +inline IQueueEvent* MakeQueueEventCallback(TCallback&& callback) { + return new TQueueEventCallback<TCallback>(std::forward<TCallback>(callback)); +} + +template<class T> +inline IQueueEvent* MakeQueueEventCallback(T* self, void (T::*method)(EQueueEventStatus)) { + using TPtr = TIntrusivePtr<T>; + return MakeQueueEventCallback([self = TPtr(self), method] (EQueueEventStatus status) { + ((*self).*method)(status); + }); +} + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_async_ctx_base.h b/library/cpp/grpc/server/grpc_async_ctx_base.h new file mode 100644 index 0000000000..51356d4ce5 --- /dev/null +++ b/library/cpp/grpc/server/grpc_async_ctx_base.h @@ -0,0 +1,94 @@ +#pragma once + +#include "grpc_server.h" + +#include <util/generic/vector.h> +#include <util/generic/string.h> +#include <util/system/yassert.h> +#include <util/generic/set.h> + +#include <grpc++/server.h> +#include <grpc++/server_context.h> + +#include <chrono> + +namespace NGrpc { + +template<typename TService> +class TBaseAsyncContext: public ICancelableContext { +public: + TBaseAsyncContext(typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq) + : Service(service) + , CQ(cq) + { + } + + TString GetPeerName() const { + return TString(Context.peer()); + } + + TInstant Deadline() const { + // The timeout transferred in "grpc-timeout" header [1] and calculated from the deadline + // right before the request is getting to be send. + // 1. https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md + // + // After this timeout calculated back to the deadline on the server side + // using server grpc GPR_CLOCK_MONOTONIC time (raw_deadline() method). + // deadline() method convert this to epoch related deadline GPR_CLOCK_REALTIME + // + + std::chrono::system_clock::time_point t = Context.deadline(); + if (t == std::chrono::system_clock::time_point::max()) { + return TInstant::Max(); + } + auto us = std::chrono::time_point_cast<std::chrono::microseconds>(t); + return TInstant::MicroSeconds(us.time_since_epoch().count()); + } + + TSet<TStringBuf> GetPeerMetaKeys() const { + TSet<TStringBuf> keys; + for (const auto& [key, _]: Context.client_metadata()) { + keys.emplace(key.data(), key.size()); + } + return keys; + } + + TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const { + const auto& clientMetadata = Context.client_metadata(); + const auto range = clientMetadata.equal_range(grpc::string_ref{key.data(), key.size()}); + if (range.first == range.second) { + return {}; + } + + TVector<TStringBuf> values; + values.reserve(std::distance(range.first, range.second)); + + for (auto it = range.first; it != range.second; ++it) { + values.emplace_back(it->second.data(), it->second.size()); + } + return values; + } + + grpc_compression_level GetCompressionLevel() const { + return Context.compression_level(); + } + + void Shutdown() override { + // Shutdown may only be called after request has started successfully + if (Context.c_call()) + Context.TryCancel(); + } + +protected: + //! The means of communication with the gRPC runtime for an asynchronous + //! server. + typename TService::TCurrentGRpcService::AsyncService* const Service; + //! The producer-consumer queue where for asynchronous server notifications. + grpc::ServerCompletionQueue* const CQ; + //! Context for the rpc, allowing to tweak aspects of it such as the use + //! of compression, authentication, as well as to send metadata back to the + //! client. + grpc::ServerContext Context; +}; + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_counters.cpp b/library/cpp/grpc/server/grpc_counters.cpp new file mode 100644 index 0000000000..fa96e0100b --- /dev/null +++ b/library/cpp/grpc/server/grpc_counters.cpp @@ -0,0 +1,45 @@ +#include "grpc_counters.h" + +namespace NGrpc { +namespace { + +class TFakeCounterBlock final: public ICounterBlock { +private: + void CountNotOkRequest() override { + } + + void CountNotOkResponse() override { + } + + void CountNotAuthenticated() override { + } + + void CountResourceExhausted() override { + } + + void CountRequestBytes(ui32 /*requestSize*/) override { + } + + void CountResponseBytes(ui32 /*responseSize*/) override { + } + + void StartProcessing(ui32 /*requestSize*/) override { + } + + void FinishProcessing( + ui32 /*requestSize*/, + ui32 /*responseSize*/, + bool /*ok*/, + ui32 /*status*/, + TDuration /*requestDuration*/) override + { + } +}; + +} // namespace + +ICounterBlockPtr FakeCounterBlock() { + return MakeIntrusive<TFakeCounterBlock>(); +} + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_counters.h b/library/cpp/grpc/server/grpc_counters.h new file mode 100644 index 0000000000..0b6c36c84c --- /dev/null +++ b/library/cpp/grpc/server/grpc_counters.h @@ -0,0 +1,136 @@ +#pragma once + +#include <library/cpp/monlib/dynamic_counters/percentile/percentile.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <util/generic/ptr.h> + +namespace NGrpc { + +struct ICounterBlock : public TThrRefBase { + virtual void CountNotOkRequest() = 0; + virtual void CountNotOkResponse() = 0; + virtual void CountNotAuthenticated() = 0; + virtual void CountResourceExhausted() = 0; + virtual void CountRequestBytes(ui32 requestSize) = 0; + virtual void CountResponseBytes(ui32 responseSize) = 0; + virtual void StartProcessing(ui32 requestSize) = 0; + virtual void FinishProcessing(ui32 requestSize, ui32 responseSize, bool ok, ui32 status, TDuration requestDuration) = 0; + virtual void CountRequestsWithoutDatabase() {} + virtual void CountRequestsWithoutToken() {} + virtual void CountRequestWithoutTls() {} + + virtual TIntrusivePtr<ICounterBlock> Clone() { return this; } + virtual void UseDatabase(const TString& database) { Y_UNUSED(database); } +}; + +using ICounterBlockPtr = TIntrusivePtr<ICounterBlock>; + +class TCounterBlock final : public ICounterBlock { + NMonitoring::TDynamicCounters::TCounterPtr TotalCounter; + NMonitoring::TDynamicCounters::TCounterPtr InflyCounter; + NMonitoring::TDynamicCounters::TCounterPtr NotOkRequestCounter; + NMonitoring::TDynamicCounters::TCounterPtr NotOkResponseCounter; + NMonitoring::TDynamicCounters::TCounterPtr RequestBytes; + NMonitoring::TDynamicCounters::TCounterPtr InflyRequestBytes; + NMonitoring::TDynamicCounters::TCounterPtr ResponseBytes; + NMonitoring::TDynamicCounters::TCounterPtr NotAuthenticated; + NMonitoring::TDynamicCounters::TCounterPtr ResourceExhausted; + bool Percentile = false; + NMonitoring::TPercentileTracker<4, 512, 15> RequestHistMs; + std::array<NMonitoring::TDynamicCounters::TCounterPtr, 2> GRpcStatusCounters; + +public: + TCounterBlock(NMonitoring::TDynamicCounters::TCounterPtr totalCounter, + NMonitoring::TDynamicCounters::TCounterPtr inflyCounter, + NMonitoring::TDynamicCounters::TCounterPtr notOkRequestCounter, + NMonitoring::TDynamicCounters::TCounterPtr notOkResponseCounter, + NMonitoring::TDynamicCounters::TCounterPtr requestBytes, + NMonitoring::TDynamicCounters::TCounterPtr inflyRequestBytes, + NMonitoring::TDynamicCounters::TCounterPtr responseBytes, + NMonitoring::TDynamicCounters::TCounterPtr notAuthenticated, + NMonitoring::TDynamicCounters::TCounterPtr resourceExhausted, + TIntrusivePtr<NMonitoring::TDynamicCounters> group) + : TotalCounter(std::move(totalCounter)) + , InflyCounter(std::move(inflyCounter)) + , NotOkRequestCounter(std::move(notOkRequestCounter)) + , NotOkResponseCounter(std::move(notOkResponseCounter)) + , RequestBytes(std::move(requestBytes)) + , InflyRequestBytes(std::move(inflyRequestBytes)) + , ResponseBytes(std::move(responseBytes)) + , NotAuthenticated(std::move(notAuthenticated)) + , ResourceExhausted(std::move(resourceExhausted)) + { + if (group) { + RequestHistMs.Initialize(group, "event", "request", "ms", {0.5f, 0.9f, 0.99f, 0.999f, 1.0f}); + Percentile = true; + } + } + + void CountNotOkRequest() override { + NotOkRequestCounter->Inc(); + } + + void CountNotOkResponse() override { + NotOkResponseCounter->Inc(); + } + + void CountNotAuthenticated() override { + NotAuthenticated->Inc(); + } + + void CountResourceExhausted() override { + ResourceExhausted->Inc(); + } + + void CountRequestBytes(ui32 requestSize) override { + *RequestBytes += requestSize; + } + + void CountResponseBytes(ui32 responseSize) override { + *ResponseBytes += responseSize; + } + + void StartProcessing(ui32 requestSize) override { + TotalCounter->Inc(); + InflyCounter->Inc(); + *RequestBytes += requestSize; + *InflyRequestBytes += requestSize; + } + + void FinishProcessing(ui32 requestSize, ui32 responseSize, bool ok, ui32 status, + TDuration requestDuration) override + { + Y_UNUSED(status); + + InflyCounter->Dec(); + *InflyRequestBytes -= requestSize; + *ResponseBytes += responseSize; + if (!ok) { + NotOkResponseCounter->Inc(); + } + if (Percentile) { + RequestHistMs.Increment(requestDuration.MilliSeconds()); + } + } + + ICounterBlockPtr Clone() override { + return this; + } + + void Update() { + if (Percentile) { + RequestHistMs.Update(); + } + } +}; + +using TCounterBlockPtr = TIntrusivePtr<TCounterBlock>; + +/** + * Creates new instance of ICounterBlock implementation which does nothing. + * + * @return new instance + */ +ICounterBlockPtr FakeCounterBlock(); + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_request.cpp b/library/cpp/grpc/server/grpc_request.cpp new file mode 100644 index 0000000000..d18a32776f --- /dev/null +++ b/library/cpp/grpc/server/grpc_request.cpp @@ -0,0 +1,59 @@ +#include "grpc_request.h" + +namespace NGrpc { + +const char* GRPC_USER_AGENT_HEADER = "user-agent"; + +class TStreamAdaptor: public IStreamAdaptor { +public: + TStreamAdaptor() + : StreamIsReady_(true) + {} + + void Enqueue(std::function<void()>&& fn, bool urgent) override { + with_lock(Mtx_) { + if (!UrgentQueue_.empty() || !NormalQueue_.empty()) { + Y_VERIFY(!StreamIsReady_); + } + auto& queue = urgent ? UrgentQueue_ : NormalQueue_; + if (StreamIsReady_ && queue.empty()) { + StreamIsReady_ = false; + } else { + queue.push_back(std::move(fn)); + return; + } + } + fn(); + } + + size_t ProcessNext() override { + size_t left = 0; + std::function<void()> fn; + with_lock(Mtx_) { + Y_VERIFY(!StreamIsReady_); + auto& queue = UrgentQueue_.empty() ? NormalQueue_ : UrgentQueue_; + if (queue.empty()) { + // Both queues are empty + StreamIsReady_ = true; + } else { + fn = std::move(queue.front()); + queue.pop_front(); + left = UrgentQueue_.size() + NormalQueue_.size(); + } + } + if (fn) + fn(); + return left; + } +private: + bool StreamIsReady_; + TList<std::function<void()>> NormalQueue_; + TList<std::function<void()>> UrgentQueue_; + TMutex Mtx_; +}; + +IStreamAdaptor::TPtr CreateStreamAdaptor() { + return std::make_unique<TStreamAdaptor>(); +} + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h new file mode 100644 index 0000000000..5bd8d3902b --- /dev/null +++ b/library/cpp/grpc/server/grpc_request.h @@ -0,0 +1,543 @@ +#pragma once + +#include <google/protobuf/text_format.h> +#include <google/protobuf/arena.h> +#include <google/protobuf/message.h> + +#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/logger/priority.h> + +#include "grpc_response.h" +#include "event_callback.h" +#include "grpc_async_ctx_base.h" +#include "grpc_counters.h" +#include "grpc_request_base.h" +#include "grpc_server.h" +#include "logger.h" + +#include <util/system/hp_timer.h> + +#include <grpc++/server.h> +#include <grpc++/server_context.h> +#include <grpc++/support/async_stream.h> +#include <grpc++/support/async_unary_call.h> +#include <grpc++/support/byte_buffer.h> +#include <grpc++/impl/codegen/async_stream.h> + +namespace NGrpc { + +class IStreamAdaptor { +public: + using TPtr = std::unique_ptr<IStreamAdaptor>; + virtual void Enqueue(std::function<void()>&& fn, bool urgent) = 0; + virtual size_t ProcessNext() = 0; + virtual ~IStreamAdaptor() = default; +}; + +IStreamAdaptor::TPtr CreateStreamAdaptor(); + +/////////////////////////////////////////////////////////////////////////////// +template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter, typename TOutProtoPrinter> +class TGRpcRequestImpl + : public TBaseAsyncContext<TService> + , public IQueueEvent + , public IRequestContextBase +{ + using TThis = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>; + +public: + using TOnRequest = std::function<void (IRequestContextBase* ctx)>; + using TRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*, + grpc::ServerAsyncResponseWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*); + using TStreamRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*, + grpc::ServerAsyncWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*); + + TGRpcRequestImpl(TService* server, + typename TService::TCurrentGRpcService::AsyncService* service, + grpc::ServerCompletionQueue* cq, + TOnRequest cb, + TRequestCallback requestCallback, + const char* name, + TLoggerPtr logger, + ICounterBlockPtr counters, + IGRpcRequestLimiterPtr limiter) + : TBaseAsyncContext<TService>(service, cq) + , Server_(server) + , Cb_(cb) + , RequestCallback_(requestCallback) + , StreamRequestCallback_(nullptr) + , Name_(name) + , Logger_(std::move(logger)) + , Counters_(std::move(counters)) + , RequestLimiter_(std::move(limiter)) + , Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context)) + , StateFunc_(&TThis::SetRequestDone) + { + AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false); + Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_); + Y_VERIFY(Request_); + GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_); + FinishPromise_ = NThreading::NewPromise<EFinishStatus>(); + } + + TGRpcRequestImpl(TService* server, + typename TService::TCurrentGRpcService::AsyncService* service, + grpc::ServerCompletionQueue* cq, + TOnRequest cb, + TStreamRequestCallback requestCallback, + const char* name, + TLoggerPtr logger, + ICounterBlockPtr counters, + IGRpcRequestLimiterPtr limiter) + : TBaseAsyncContext<TService>(service, cq) + , Server_(server) + , Cb_(cb) + , RequestCallback_(nullptr) + , StreamRequestCallback_(requestCallback) + , Name_(name) + , Logger_(std::move(logger)) + , Counters_(std::move(counters)) + , RequestLimiter_(std::move(limiter)) + , StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context)) + , StateFunc_(&TThis::SetRequestDone) + { + AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false); + Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_); + Y_VERIFY(Request_); + GRPC_LOG_DEBUG(Logger_, "[%p] created streaming request Name# %s", this, Name_); + FinishPromise_ = NThreading::NewPromise<EFinishStatus>(); + StreamAdaptor_ = CreateStreamAdaptor(); + } + + TAsyncFinishResult GetFinishFuture() override { + return FinishPromise_.GetFuture(); + } + + TString GetPeer() const override { + return TString(this->Context.peer()); + } + + bool SslServer() const override { + return Server_->SslServer(); + } + + void Run() { + // Start request unless server is shutting down + if (auto guard = Server_->ProtectShutdown()) { + Ref(); //For grpc c runtime + this->Context.AsyncNotifyWhenDone(OnFinishTag.Prepare()); + if (RequestCallback_) { + (this->Service->*RequestCallback_) + (&this->Context, Request_, + reinterpret_cast<grpc::ServerAsyncResponseWriter<TOut>*>(Writer_.Get()), this->CQ, this->CQ, GetGRpcTag()); + } else { + (this->Service->*StreamRequestCallback_) + (&this->Context, Request_, + reinterpret_cast<grpc::ServerAsyncWriter<TOut>*>(StreamWriter_.Get()), this->CQ, this->CQ, GetGRpcTag()); + } + } + } + + ~TGRpcRequestImpl() { + // No direct dtor call allowed + Y_ASSERT(RefCount() == 0); + } + + bool Execute(bool ok) override { + return (this->*StateFunc_)(ok); + } + + void DestroyRequest() override { + if (RequestRegistered_) { + Server_->DeregisterRequestCtx(this); + RequestRegistered_ = false; + } + UnRef(); + } + + TInstant Deadline() const override { + return TBaseAsyncContext<TService>::Deadline(); + } + + TSet<TStringBuf> GetPeerMetaKeys() const override { + return TBaseAsyncContext<TService>::GetPeerMetaKeys(); + } + + TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const override { + return TBaseAsyncContext<TService>::GetPeerMetaValues(key); + } + + grpc_compression_level GetCompressionLevel() const override { + return TBaseAsyncContext<TService>::GetCompressionLevel(); + } + + //! Get pointer to the request's message. + const NProtoBuf::Message* GetRequest() const override { + return Request_; + } + + TAuthState& GetAuthState() override { + return AuthState_; + } + + void Reply(NProtoBuf::Message* resp, ui32 status) override { + ResponseStatus = status; + WriteDataOk(resp); + } + + void Reply(grpc::ByteBuffer* resp, ui32 status) override { + ResponseStatus = status; + WriteByteDataOk(resp); + } + + void ReplyError(grpc::StatusCode code, const TString& msg) override { + FinishGrpcStatus(code, msg, false); + } + + void ReplyUnauthenticated(const TString& in) override { + const TString message = in.empty() ? TString("unauthenticated") : TString("unauthenticated, ") + in; + FinishGrpcStatus(grpc::StatusCode::UNAUTHENTICATED, message, false); + } + + void SetNextReplyCallback(TOnNextReply&& cb) override { + NextReplyCb_ = cb; + } + + void AddTrailingMetadata(const TString& key, const TString& value) override { + this->Context.AddTrailingMetadata(key, value); + } + + void FinishStreamingOk() override { + GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (enqueued)", this, Name_, + this->Context.peer().c_str()); + auto cb = [this]() { + StateFunc_ = &TThis::SetFinishDone; + GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (pushed to grpc)", this, Name_, + this->Context.peer().c_str()); + + StreamWriter_->Finish(grpc::Status::OK, GetGRpcTag()); + }; + StreamAdaptor_->Enqueue(std::move(cb), false); + } + + google::protobuf::Arena* GetArena() override { + return &Arena_; + } + + void UseDatabase(const TString& database) override { + Counters_->UseDatabase(database); + } + +private: + void Clone() { + if (!Server_->IsShuttingDown()) { + if (RequestCallback_) { + MakeIntrusive<TThis>( + Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); + } else { + MakeIntrusive<TThis>( + Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); + } + } + } + + void WriteDataOk(NProtoBuf::Message* resp) { + auto makeResponseString = [&] { + TString x; + TOutProtoPrinter printer; + printer.SetSingleLineMode(true); + printer.PrintToString(*resp, &x); + return x; + }; + + auto sz = (size_t)resp->ByteSize(); + if (Writer_) { + GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s", this, Name_, + makeResponseString().data(), this->Context.peer().c_str()); + StateFunc_ = &TThis::SetFinishDone; + ResponseSize = sz; + Y_VERIFY(this->Context.c_call()); + Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); + } else { + GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (enqueued)", + this, Name_, makeResponseString().data(), this->Context.peer().c_str()); + + // because of std::function cannot hold move-only captured object + // we allocate shared object on heap to avoid message copy + auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp); + auto cb = [this, uResp = std::move(uResp), sz, &makeResponseString]() { + GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (pushed to grpc)", + this, Name_, makeResponseString().data(), this->Context.peer().c_str()); + StateFunc_ = &TThis::NextReply; + ResponseSize += sz; + StreamWriter_->Write(*uResp, GetGRpcTag()); + }; + StreamAdaptor_->Enqueue(std::move(cb), false); + } + } + + void WriteByteDataOk(grpc::ByteBuffer* resp) { + auto sz = resp->Length(); + if (Writer_) { + GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_, + this->Context.peer().c_str()); + StateFunc_ = &TThis::SetFinishDone; + ResponseSize = sz; + Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); + } else { + GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (enqueued)", this, Name_, + this->Context.peer().c_str()); + + // because of std::function cannot hold move-only captured object + // we allocate shared object on heap to avoid buffer copy + auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp); + auto cb = [this, uResp = std::move(uResp), sz]() { + GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)", + this, Name_, this->Context.peer().c_str()); + StateFunc_ = &TThis::NextReply; + ResponseSize += sz; + StreamWriter_->Write(*uResp, GetGRpcTag()); + }; + StreamAdaptor_->Enqueue(std::move(cb), false); + } + } + + void FinishGrpcStatus(grpc::StatusCode code, const TString& msg, bool urgent) { + Y_VERIFY(code != grpc::OK); + if (code == grpc::StatusCode::UNAUTHENTICATED) { + Counters_->CountNotAuthenticated(); + } else if (code == grpc::StatusCode::RESOURCE_EXHAUSTED) { + Counters_->CountResourceExhausted(); + } + + if (Writer_) { + GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)", this, + Name_, msg.c_str(), this->Context.peer().c_str(), (int)code); + StateFunc_ = &TThis::SetFinishError; + TOut resp; + Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg), GetGRpcTag()); + } else { + GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)" + " (enqueued)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code); + auto cb = [this, code, msg]() { + GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)" + " (pushed to grpc)", this, Name_, msg.c_str(), + this->Context.peer().c_str(), (int)code); + StateFunc_ = &TThis::SetFinishError; + StreamWriter_->Finish(grpc::Status(code, msg), GetGRpcTag()); + }; + StreamAdaptor_->Enqueue(std::move(cb), urgent); + } + } + + bool SetRequestDone(bool ok) { + auto makeRequestString = [&] { + TString resp; + if (ok) { + TInProtoPrinter printer; + printer.SetSingleLineMode(true); + printer.PrintToString(*Request_, &resp); + } else { + resp = "<not ok>"; + } + return resp; + }; + GRPC_LOG_DEBUG(Logger_, "[%p] received request Name# %s ok# %s data# %s peer# %s", this, Name_, + ok ? "true" : "false", makeRequestString().data(), this->Context.peer().c_str()); + + if (this->Context.c_call() == nullptr) { + Y_VERIFY(!ok); + // One ref by OnFinishTag, grpc will not call this tag if no request received + UnRef(); + } else if (!(RequestRegistered_ = Server_->RegisterRequestCtx(this))) { + // Request cannot be registered due to shutdown + // It's unsafe to continue, so drop this request without processing + GRPC_LOG_DEBUG(Logger_, "[%p] dropping request Name# %s due to shutdown", this, Name_); + this->Context.TryCancel(); + return false; + } + + Clone(); // TODO: Request pool? + if (!ok) { + Counters_->CountNotOkRequest(); + return false; + } + + if (IncRequest()) { + // Adjust counters. + RequestSize = Request_->ByteSize(); + Counters_->StartProcessing(RequestSize); + RequestTimer.Reset(); + + if (!SslServer()) { + Counters_->CountRequestWithoutTls(); + } + + //TODO: Move this in to grpc_request_proxy + auto maybeDatabase = GetPeerMetaValues(TStringBuf("x-ydb-database")); + if (maybeDatabase.empty()) { + Counters_->CountRequestsWithoutDatabase(); + } + auto maybeToken = GetPeerMetaValues(TStringBuf("x-ydb-auth-ticket")); + if (maybeToken.empty() || maybeToken[0].empty()) { + TString db{maybeDatabase ? maybeDatabase[0] : TStringBuf{}}; + Counters_->CountRequestsWithoutToken(); + GRPC_LOG_DEBUG(Logger_, "[%p] received request without user token " + "Name# %s data# %s peer# %s database# %s", this, Name_, + makeRequestString().data(), this->Context.peer().c_str(), db.c_str()); + } + + // Handle current request. + Cb_(this); + } else { + //This request has not been counted + SkipUpdateCountersOnError = true; + FinishGrpcStatus(grpc::StatusCode::RESOURCE_EXHAUSTED, "no resource", true); + } + return true; + } + + bool NextReply(bool ok) { + auto logCb = [this, ok](int left) { + GRPC_LOG_DEBUG(Logger_, "[%p] ready for next reply Name# %s ok# %s peer# %s left# %d", this, Name_, + ok ? "true" : "false", this->Context.peer().c_str(), left); + }; + + if (!ok) { + logCb(-1); + DecRequest(); + Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus, + TDuration::Seconds(RequestTimer.Passed())); + return false; + } + + Ref(); // To prevent destroy during this call in case of execution Finish + size_t left = StreamAdaptor_->ProcessNext(); + logCb(left); + if (NextReplyCb_) { + NextReplyCb_(left); + } + // Now it is safe to destroy even if Finish was called + UnRef(); + return true; + } + + bool SetFinishDone(bool ok) { + GRPC_LOG_DEBUG(Logger_, "[%p] finished request Name# %s ok# %s peer# %s", this, Name_, + ok ? "true" : "false", this->Context.peer().c_str()); + //PrintBackTrace(); + DecRequest(); + Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus, + TDuration::Seconds(RequestTimer.Passed())); + return false; + } + + bool SetFinishError(bool ok) { + GRPC_LOG_DEBUG(Logger_, "[%p] finished request with error Name# %s ok# %s peer# %s", this, Name_, + ok ? "true" : "false", this->Context.peer().c_str()); + if (!SkipUpdateCountersOnError) { + DecRequest(); + Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus, + TDuration::Seconds(RequestTimer.Passed())); + } + return false; + } + + // Returns pointer to IQueueEvent to pass into grpc c runtime + // Implicit C style cast from this to void* is wrong due to multiple inheritance + void* GetGRpcTag() { + return static_cast<IQueueEvent*>(this); + } + + void OnFinish(EQueueEventStatus evStatus) { + if (this->Context.IsCancelled()) { + FinishPromise_.SetValue(EFinishStatus::CANCEL); + } else { + FinishPromise_.SetValue(evStatus == EQueueEventStatus::OK ? EFinishStatus::OK : EFinishStatus::ERROR); + } + } + + bool IncRequest() { + if (!Server_->IncRequest()) + return false; + + if (!RequestLimiter_) + return true; + + if (!RequestLimiter_->IncRequest()) { + Server_->DecRequest(); + return false; + } + + return true; + } + + void DecRequest() { + if (RequestLimiter_) { + RequestLimiter_->DecRequest(); + } + Server_->DecRequest(); + } + + using TStateFunc = bool (TThis::*)(bool); + TService* Server_; + TOnRequest Cb_; + TRequestCallback RequestCallback_; + TStreamRequestCallback StreamRequestCallback_; + const char* const Name_; + TLoggerPtr Logger_; + ICounterBlockPtr Counters_; + IGRpcRequestLimiterPtr RequestLimiter_; + + THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_; + THolder<grpc::ServerAsyncWriterInterface<TUniversalResponse<TOut>>> StreamWriter_; + TStateFunc StateFunc_; + TIn* Request_; + + google::protobuf::Arena Arena_; + TOnNextReply NextReplyCb_; + ui32 RequestSize = 0; + ui32 ResponseSize = 0; + ui32 ResponseStatus = 0; + THPTimer RequestTimer; + TAuthState AuthState_ = 0; + bool RequestRegistered_ = false; + + using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>; + TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish }; + NThreading::TPromise<EFinishStatus> FinishPromise_; + bool SkipUpdateCountersOnError = false; + IStreamAdaptor::TPtr StreamAdaptor_; +}; + +template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter=google::protobuf::TextFormat::Printer, typename TOutProtoPrinter=google::protobuf::TextFormat::Printer> +class TGRpcRequest: public TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter> { + using TBase = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>; +public: + TGRpcRequest(TService* server, + typename TService::TCurrentGRpcService::AsyncService* service, + grpc::ServerCompletionQueue* cq, + typename TBase::TOnRequest cb, + typename TBase::TRequestCallback requestCallback, + const char* name, + TLoggerPtr logger, + ICounterBlockPtr counters, + IGRpcRequestLimiterPtr limiter = nullptr) + : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)} + { + } + + TGRpcRequest(TService* server, + typename TService::TCurrentGRpcService::AsyncService* service, + grpc::ServerCompletionQueue* cq, + typename TBase::TOnRequest cb, + typename TBase::TStreamRequestCallback requestCallback, + const char* name, + TLoggerPtr logger, + ICounterBlockPtr counters) + : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), nullptr} + { + } +}; + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h new file mode 100644 index 0000000000..fcfce1c181 --- /dev/null +++ b/library/cpp/grpc/server/grpc_request_base.h @@ -0,0 +1,116 @@ +#pragma once + +#include <google/protobuf/message.h> +#include <library/cpp/threading/future/future.h> + +#include <grpc++/server_context.h> + +namespace grpc { +class ByteBuffer; +} + +namespace NGrpc { + +extern const char* GRPC_USER_AGENT_HEADER; + +struct TAuthState { + enum EAuthState { + AS_NOT_PERFORMED, + AS_OK, + AS_FAIL, + AS_UNAVAILABLE + }; + TAuthState(bool needAuth) + : NeedAuth(needAuth) + , State(AS_NOT_PERFORMED) + {} + bool NeedAuth; + EAuthState State; +}; + + +//! An interface that may be used to limit concurrency of requests +class IGRpcRequestLimiter: public TThrRefBase { +public: + virtual bool IncRequest() = 0; + virtual void DecRequest() = 0; +}; + +using IGRpcRequestLimiterPtr = TIntrusivePtr<IGRpcRequestLimiter>; + +//! State of current request +class IRequestContextBase: public TThrRefBase { +public: + enum class EFinishStatus { + OK, + ERROR, + CANCEL + }; + using TAsyncFinishResult = NThreading::TFuture<EFinishStatus>; + + using TOnNextReply = std::function<void (size_t left)>; + + //! Get pointer to the request's message. + virtual const NProtoBuf::Message* GetRequest() const = 0; + + //! Get current auth state + virtual TAuthState& GetAuthState() = 0; + + //! Send common response (The request shoult be created for protobuf response type) + //! Implementation can swap protobuf message + virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0) = 0; + + //! Send serialised response (The request shoult be created for bytes response type) + //! Implementation can swap ByteBuffer + virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0) = 0; + + //! Send grpc UNAUTHENTICATED status + virtual void ReplyUnauthenticated(const TString& in) = 0; + + //! Send grpc error + virtual void ReplyError(grpc::StatusCode code, const TString& msg) = 0; + + //! Returns deadline (server epoch related) if peer set it on its side, or Instanse::Max() otherwise + virtual TInstant Deadline() const = 0; + + //! Returns available peer metadata keys + virtual TSet<TStringBuf> GetPeerMetaKeys() const = 0; + + //! Returns peer optional metavalue + virtual TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const = 0; + + //! Returns request compression level + virtual grpc_compression_level GetCompressionLevel() const = 0; + + //! Returns protobuf arena allocator associated with current request + //! Lifetime of the arena is lifetime of the context + virtual google::protobuf::Arena* GetArena() = 0; + + //! Add trailing metadata in to grpc context + //! The metadata will be send at the time of rpc finish + virtual void AddTrailingMetadata(const TString& key, const TString& value) = 0; + + //! Use validated database name for counters + virtual void UseDatabase(const TString& database) = 0; + + // Streaming part + + //! Set callback. The callback will be called when response deliverid to the client + //! after that we can call Reply again in streaming mode. Yes, GRpc says there is only one + //! reply in flight + virtual void SetNextReplyCallback(TOnNextReply&& cb) = 0; + + //! Finish streaming reply + virtual void FinishStreamingOk() = 0; + + //! Returns future to get cancel of finish notification + virtual TAsyncFinishResult GetFinishFuture() = 0; + + //! Returns peer address + virtual TString GetPeer() const = 0; + + //! Returns true if server is using ssl + virtual bool SslServer() const = 0; +}; + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_response.h b/library/cpp/grpc/server/grpc_response.h new file mode 100644 index 0000000000..8e9afe44d5 --- /dev/null +++ b/library/cpp/grpc/server/grpc_response.h @@ -0,0 +1,90 @@ +#pragma once + +#include <grpc++/impl/codegen/byte_buffer.h> +#include <grpc++/impl/codegen/proto_utils.h> + +#include <variant> + +namespace NGrpc { + +/** + * Universal response that owns underlying message or buffer. + */ +template <typename TMsg> +class TUniversalResponse: public TAtomicRefCount<TUniversalResponse<TMsg>>, public TMoveOnly { + friend class grpc::SerializationTraits<TUniversalResponse<TMsg>>; + +public: + explicit TUniversalResponse(NProtoBuf::Message* msg) noexcept + : Data_{TMsg{}} + { + std::get<TMsg>(Data_).Swap(static_cast<TMsg*>(msg)); + } + + explicit TUniversalResponse(grpc::ByteBuffer* buffer) noexcept + : Data_{grpc::ByteBuffer{}} + { + std::get<grpc::ByteBuffer>(Data_).Swap(buffer); + } + +private: + std::variant<TMsg, grpc::ByteBuffer> Data_; +}; + +/** + * Universal response that only keeps reference to underlying message or buffer. + */ +template <typename TMsg> +class TUniversalResponseRef: private TMoveOnly { + friend class grpc::SerializationTraits<TUniversalResponseRef<TMsg>>; + +public: + explicit TUniversalResponseRef(const NProtoBuf::Message* msg) + : Data_{msg} + { + } + + explicit TUniversalResponseRef(const grpc::ByteBuffer* buffer) + : Data_{buffer} + { + } + +private: + std::variant<const NProtoBuf::Message*, const grpc::ByteBuffer*> Data_; +}; + +} // namespace NGrpc + +namespace grpc { + +template <typename TMsg> +class SerializationTraits<NGrpc::TUniversalResponse<TMsg>> { +public: + static Status Serialize( + const NGrpc::TUniversalResponse<TMsg>& resp, + ByteBuffer* buffer, + bool* ownBuffer) + { + return std::visit([&](const auto& data) { + using T = std::decay_t<decltype(data)>; + return SerializationTraits<T>::Serialize(data, buffer, ownBuffer); + }, resp.Data_); + } +}; + +template <typename TMsg> +class SerializationTraits<NGrpc::TUniversalResponseRef<TMsg>> { +public: + static Status Serialize( + const NGrpc::TUniversalResponseRef<TMsg>& resp, + ByteBuffer* buffer, + bool* ownBuffer) + { + return std::visit([&](const auto* data) { + using T = std::decay_t<std::remove_pointer_t<decltype(data)>>; + return SerializationTraits<T>::Serialize(*data, buffer, ownBuffer); + }, resp.Data_); + } +}; + +} // namespace grpc diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp new file mode 100644 index 0000000000..7437b7a8f5 --- /dev/null +++ b/library/cpp/grpc/server/grpc_server.cpp @@ -0,0 +1,240 @@ +#include "grpc_server.h" + +#include <util/string/join.h> +#include <util/generic/yexception.h> +#include <util/system/thread.h> + +#include <grpc++/resource_quota.h> +#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h> + +#if !defined(_WIN32) && !defined(_WIN64) + +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/tcp.h> + +#endif + +namespace NGrpc { + +using NThreading::TFuture; + +static void PullEvents(grpc::ServerCompletionQueue* cq) { + TThread::SetCurrentThreadName("grpc_server"); + while (true) { + void* tag; // uniquely identifies a request. + bool ok; + + if (cq->Next(&tag, &ok)) { + IQueueEvent* const ev(static_cast<IQueueEvent*>(tag)); + + if (!ev->Execute(ok)) { + ev->DestroyRequest(); + } + } else { + break; + } + } +} + +TGRpcServer::TGRpcServer(const TServerOptions& opts) + : Options_(opts) + , Limiter_(Options_.MaxGlobalRequestInFlight) + {} + +TGRpcServer::~TGRpcServer() { + Y_VERIFY(Ts.empty()); + Services_.clear(); +} + +void TGRpcServer::AddService(IGRpcServicePtr service) { + Services_.push_back(service); +} + +void TGRpcServer::Start() { + TString server_address(Join(":", Options_.Host, Options_.Port)); // https://st.yandex-team.ru/DTCC-695 + using grpc::ServerBuilder; + using grpc::ResourceQuota; + ServerBuilder builder; + auto credentials = grpc::InsecureServerCredentials(); + if (Options_.SslData) { + grpc::SslServerCredentialsOptions::PemKeyCertPair keycert; + keycert.cert_chain = std::move(Options_.SslData->Cert); + keycert.private_key = std::move(Options_.SslData->Key); + grpc::SslServerCredentialsOptions sslOps; + sslOps.pem_root_certs = std::move(Options_.SslData->Root); + sslOps.pem_key_cert_pairs.push_back(keycert); + credentials = grpc::SslServerCredentials(sslOps); + } + if (Options_.ExternalListener) { + Options_.ExternalListener->Init(builder.experimental().AddExternalConnectionAcceptor( + ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD, + credentials + )); + } else { + builder.AddListeningPort(server_address, credentials); + } + builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize); + builder.SetMaxSendMessageSize(Options_.MaxMessageSize); + for (IGRpcServicePtr service : Services_) { + service->SetServerOptions(Options_); + builder.RegisterService(service->GetService()); + service->SetGlobalLimiterHandle(&Limiter_); + } + + class TKeepAliveOption: public grpc::ServerBuilderOption { + public: + TKeepAliveOption(int idle, int interval) + : Idle(idle) + , Interval(interval) + , KeepAliveEnabled(true) + {} + + TKeepAliveOption() + : Idle(0) + , Interval(0) + , KeepAliveEnabled(false) + {} + + void UpdateArguments(grpc::ChannelArguments *args) override { + args->SetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES, 0); + args->SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 1000); + if (KeepAliveEnabled) { + args->SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0); + args->SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); + args->SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, Idle * 1000); + args->SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, Idle * 1000); + args->SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, Interval * 1000); + } + } + + void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override + {} + private: + const int Idle; + const int Interval; + const bool KeepAliveEnabled; + }; + + if (Options_.KeepAliveEnable) { + builder.SetOption(std::make_unique<TKeepAliveOption>( + Options_.KeepAliveIdleTimeoutTriggerSec, + Options_.KeepAliveProbeIntervalSec)); + } else { + builder.SetOption(std::make_unique<TKeepAliveOption>()); + } + + if (Options_.UseCompletionQueuePerThread) { + for (size_t i = 0; i < Options_.WorkerThreads; ++i) { + CQS_.push_back(builder.AddCompletionQueue()); + } + } else { + CQS_.push_back(builder.AddCompletionQueue()); + } + + if (Options_.GRpcMemoryQuotaBytes) { + // See details KIKIMR-6932 + /* + grpc::ResourceQuota quota("memory_bound"); + quota.Resize(Options_.GRpcMemoryQuotaBytes); + + builder.SetResourceQuota(quota); + */ + Cerr << "GRpc memory quota temporarily disabled due to issues with grpc quoter" << Endl; + } + Options_.ServerBuilderMutator(builder); + builder.SetDefaultCompressionLevel(Options_.DefaultCompressionLevel); + + Server_ = builder.BuildAndStart(); + if (!Server_) { + ythrow yexception() << "can't start grpc server on " << server_address; + } + + size_t index = 0; + for (IGRpcServicePtr service : Services_) { + // TODO: provide something else for services instead of ServerCompletionQueue + service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger); + } + + if (Options_.UseCompletionQueuePerThread) { + for (size_t i = 0; i < Options_.WorkerThreads; ++i) { + auto* cq = &CQS_[i]; + Ts.push_back(SystemThreadFactory()->Run([cq] { + PullEvents(cq->get()); + })); + } + } else { + for (size_t i = 0; i < Options_.WorkerThreads; ++i) { + auto* cq = &CQS_[0]; + Ts.push_back(SystemThreadFactory()->Run([cq] { + PullEvents(cq->get()); + })); + } + } + + if (Options_.ExternalListener) { + Options_.ExternalListener->Start(); + } +} + +void TGRpcServer::Stop() { + for (auto& service : Services_) { + service->StopService(); + } + + auto now = TInstant::Now(); + + if (Server_) { + i64 sec = Options_.GRpcShutdownDeadline.Seconds(); + Y_VERIFY(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>()); + i32 nanosecOfSec = Options_.GRpcShutdownDeadline.NanoSecondsOfSecond(); + Server_->Shutdown(gpr_timespec{sec, nanosecOfSec, GPR_TIMESPAN}); + } + + for (ui64 attempt = 0; ; ++attempt) { + bool unsafe = false; + size_t infly = 0; + for (auto& service : Services_) { + unsafe |= service->IsUnsafeToShutdown(); + infly += service->RequestsInProgress(); + } + + if (!unsafe && !infly) + break; + + auto spent = (TInstant::Now() - now).SecondsFloat(); + if (attempt % 300 == 0) { + // don't log too much + Cerr << "GRpc shutdown warning: left infly: " << infly << ", spent: " << spent << " sec" << Endl; + } + + if (!unsafe && spent > Options_.GRpcShutdownDeadline.SecondsFloat()) + break; + Sleep(TDuration::MilliSeconds(10)); + } + + // Always shutdown the completion queue after the server. + for (auto& cq : CQS_) { + cq->Shutdown(); + } + + for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) { + (*ti)->Join(); + } + + Ts.clear(); + + if (Options_.ExternalListener) { + Options_.ExternalListener->Stop(); + } +} + +ui16 TGRpcServer::GetPort() const { + return Options_.Port; +} + +TString TGRpcServer::GetHost() const { + return Options_.Host; +} + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h new file mode 100644 index 0000000000..d6814a90a0 --- /dev/null +++ b/library/cpp/grpc/server/grpc_server.h @@ -0,0 +1,356 @@ +#pragma once + +#include "grpc_request_base.h" +#include "logger.h" + +#include <library/cpp/threading/future/future.h> + +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/generic/vector.h> +#include <util/generic/maybe.h> +#include <util/generic/queue.h> +#include <util/generic/hash_set.h> +#include <util/system/types.h> +#include <util/system/mutex.h> +#include <util/thread/factory.h> + +#include <grpc++/grpc++.h> + +namespace NGrpc { + +constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000; + +struct TSslData { + TString Cert; + TString Key; + TString Root; +}; + +struct IExternalListener + : public TThrRefBase +{ + using TPtr = TIntrusivePtr<IExternalListener>; + virtual void Init(std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> acceptor) = 0; + virtual void Start() = 0; + virtual void Stop() = 0; +}; + +//! Server's options. +struct TServerOptions { +#define DECLARE_FIELD(name, type, default) \ + type name{default}; \ + inline TServerOptions& Set##name(const type& value) { \ + name = value; \ + return *this; \ + } + + //! Hostname of server to bind to. + DECLARE_FIELD(Host, TString, "[::]"); + //! Service port. + DECLARE_FIELD(Port, ui16, 0); + + //! Number of worker threads. + DECLARE_FIELD(WorkerThreads, size_t, 2); + + //! Create one completion queue per thread + DECLARE_FIELD(UseCompletionQueuePerThread, bool, false); + + //! Memory quota size for grpc server in bytes. Zero means unlimited. + DECLARE_FIELD(GRpcMemoryQuotaBytes, size_t, 0); + + //! How long to wait until pending rpcs are forcefully terminated. + DECLARE_FIELD(GRpcShutdownDeadline, TDuration, TDuration::Seconds(30)); + + //! In/Out message size limit + DECLARE_FIELD(MaxMessageSize, size_t, DEFAULT_GRPC_MESSAGE_SIZE_LIMIT); + + //! Use GRpc keepalive + DECLARE_FIELD(KeepAliveEnable, TMaybe<bool>, TMaybe<bool>()); + + //! GRPC_ARG_KEEPALIVE_TIME_MS setting + DECLARE_FIELD(KeepAliveIdleTimeoutTriggerSec, int, 0); + + //! Deprecated, ths option ignored. Will be removed soon. + DECLARE_FIELD(KeepAliveMaxProbeCount, int, 0); + + //! GRPC_ARG_KEEPALIVE_TIMEOUT_MS setting + DECLARE_FIELD(KeepAliveProbeIntervalSec, int, 0); + + //! Max number of requests processing by services (global limit for grpc server) + DECLARE_FIELD(MaxGlobalRequestInFlight, size_t, 100000); + + //! SSL server data + DECLARE_FIELD(SslData, TMaybe<TSslData>, TMaybe<TSslData>()); + + //! GRPC auth + DECLARE_FIELD(UseAuth, bool, false); + + //! Default compression level. Used when no compression options provided by client. + // Mapping to particular compression algorithm depends on client. + DECLARE_FIELD(DefaultCompressionLevel, grpc_compression_level, GRPC_COMPRESS_LEVEL_NONE); + + //! Custom configurator for ServerBuilder. + DECLARE_FIELD(ServerBuilderMutator, std::function<void(grpc::ServerBuilder&)>, [](grpc::ServerBuilder&){}); + + DECLARE_FIELD(ExternalListener, IExternalListener::TPtr, nullptr); + + //! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled). + DECLARE_FIELD(Logger, TLoggerPtr, nullptr); + +#undef DECLARE_FIELD +}; + +class IQueueEvent { +public: + virtual ~IQueueEvent() = default; + + //! Execute an action defined by implementation. + virtual bool Execute(bool ok) = 0; + + //! It is time to perform action requested by AcquireToken server method. It will be called under lock which is also + // used in ReturnToken/AcquireToken methods. Default implementation does nothing assuming that request processor does + // not implement in flight management. + virtual void Process() {} + + //! Finish and destroy request. + virtual void DestroyRequest() = 0; +}; + +class ICancelableContext { +public: + virtual void Shutdown() = 0; + virtual ~ICancelableContext() = default; +}; + +template <class TLimit> +class TInFlightLimiterImpl { +public: + explicit TInFlightLimiterImpl(const TLimit& limit) + : Limit_(limit) + {} + + bool Inc() { + i64 newVal; + i64 prev; + do { + prev = AtomicGet(CurInFlightReqs_); + Y_VERIFY(prev >= 0); + if (Limit_ && prev > Limit_) { + return false; + } + newVal = prev + 1; + } while (!AtomicCas(&CurInFlightReqs_, newVal, prev)); + return true; + } + + void Dec() { + i64 newVal = AtomicDecrement(CurInFlightReqs_); + Y_VERIFY(newVal >= 0); + } + + i64 GetCurrentInFlight() const { + return AtomicGet(CurInFlightReqs_); + } + +private: + const TLimit Limit_; + TAtomic CurInFlightReqs_ = 0; +}; + +using TGlobalLimiter = TInFlightLimiterImpl<i64>; + + +class IGRpcService: public TThrRefBase { +public: + virtual grpc::Service* GetService() = 0; + virtual void StopService() noexcept = 0; + virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0; + virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0; + virtual bool IsUnsafeToShutdown() const = 0; + virtual size_t RequestsInProgress() const = 0; + + /** + * Called before service is added to the server builder. This allows + * service to inspect server options and initialize accordingly. + */ + virtual void SetServerOptions(const TServerOptions& options) = 0; +}; + +template<typename T> +class TGrpcServiceBase: public IGRpcService { +public: + class TShutdownGuard { + using TOwner = TGrpcServiceBase<T>; + friend class TGrpcServiceBase<T>; + + public: + TShutdownGuard() + : Owner(nullptr) + { } + + ~TShutdownGuard() { + Release(); + } + + TShutdownGuard(TShutdownGuard&& other) + : Owner(other.Owner) + { + other.Owner = nullptr; + } + + TShutdownGuard& operator=(TShutdownGuard&& other) { + if (Y_LIKELY(this != &other)) { + Release(); + Owner = other.Owner; + other.Owner = nullptr; + } + return *this; + } + + explicit operator bool() const { + return bool(Owner); + } + + void Release() { + if (Owner) { + AtomicDecrement(Owner->GuardCount_); + Owner = nullptr; + } + } + + TShutdownGuard(const TShutdownGuard&) = delete; + TShutdownGuard& operator=(const TShutdownGuard&) = delete; + + private: + explicit TShutdownGuard(TOwner* owner) + : Owner(owner) + { } + + private: + TOwner* Owner; + }; + +public: + using TCurrentGRpcService = T; + + void StopService() noexcept override { + with_lock(Lock_) { + AtomicSet(ShuttingDown_, 1); + + // Send TryCansel to event (can be send after finishing). + // Actual dtors will be called from grpc thread, so deadlock impossible + for (auto* request : Requests_) { + request->Shutdown(); + } + } + } + + TShutdownGuard ProtectShutdown() noexcept { + AtomicIncrement(GuardCount_); + if (IsShuttingDown()) { + AtomicDecrement(GuardCount_); + return { }; + } + + return TShutdownGuard(this); + }; + + bool IsUnsafeToShutdown() const override { + return AtomicGet(GuardCount_) > 0; + } + + size_t RequestsInProgress() const override { + size_t c = 0; + with_lock(Lock_) { + c = Requests_.size(); + } + return c; + } + + void SetServerOptions(const TServerOptions& options) override { + SslServer_ = bool(options.SslData); + NeedAuth_ = options.UseAuth; + } + + void SetGlobalLimiterHandle(TGlobalLimiter* /*limiter*/) override {} + + //! Check if the server is going to shut down. + bool IsShuttingDown() const { + return AtomicGet(ShuttingDown_); + } + + bool SslServer() const { + return SslServer_; + } + + bool NeedAuth() const { + return NeedAuth_; + } + + bool RegisterRequestCtx(ICancelableContext* req) { + with_lock(Lock_) { + auto r = Requests_.emplace(req); + Y_VERIFY(r.second, "Ctx already registered"); + + if (IsShuttingDown()) { + // Server is already shutting down + Requests_.erase(r.first); + return false; + } + } + + return true; + } + + void DeregisterRequestCtx(ICancelableContext* req) { + with_lock(Lock_) { + Y_VERIFY(Requests_.erase(req), "Ctx is not registered"); + } + } + +protected: + using TGrpcAsyncService = typename TCurrentGRpcService::AsyncService; + TGrpcAsyncService Service_; + + TGrpcAsyncService* GetService() override { + return &Service_; + } + +private: + TAtomic ShuttingDown_ = 0; + TAtomic GuardCount_ = 0; + + bool SslServer_ = false; + bool NeedAuth_ = false; + + THashSet<ICancelableContext*> Requests_; + TAdaptiveLock Lock_; +}; + +class TGRpcServer { +public: + using IGRpcServicePtr = TIntrusivePtr<IGRpcService>; + TGRpcServer(const TServerOptions& opts); + ~TGRpcServer(); + void AddService(IGRpcServicePtr service); + void Start(); + // Send stop to registred services and call Shutdown on grpc server + // This method MUST be called before destroying TGRpcServer + void Stop(); + ui16 GetPort() const; + TString GetHost() const; + +private: + using IThreadRef = TAutoPtr<IThreadFactory::IThread>; + + const TServerOptions Options_; + std::unique_ptr<grpc::Server> Server_; + std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_; + TVector<IThreadRef> Ts; + + TVector<IGRpcServicePtr> Services_; + TGlobalLimiter Limiter_; +}; + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/logger.h b/library/cpp/grpc/server/logger.h new file mode 100644 index 0000000000..53af26be9c --- /dev/null +++ b/library/cpp/grpc/server/logger.h @@ -0,0 +1,43 @@ +#pragma once + +#include <library/cpp/logger/priority.h> + +#include <util/generic/ptr.h> + +namespace NGrpc { + +class TLogger: public TThrRefBase { +protected: + TLogger() = default; + +public: + [[nodiscard]] + bool IsEnabled(ELogPriority priority) const noexcept { + return DoIsEnabled(priority); + } + + void Y_PRINTF_FORMAT(3, 4) Write(ELogPriority priority, const char* format, ...) noexcept { + va_list args; + va_start(args, format); + DoWrite(priority, format, args); + va_end(args); + } + +protected: + virtual bool DoIsEnabled(ELogPriority priority) const noexcept = 0; + virtual void DoWrite(ELogPriority p, const char* format, va_list args) noexcept = 0; +}; + +using TLoggerPtr = TIntrusivePtr<TLogger>; + +#define GRPC_LOG_DEBUG(logger, format, ...) \ + if (logger && logger->IsEnabled(ELogPriority::TLOG_DEBUG)) { \ + logger->Write(ELogPriority::TLOG_DEBUG, format, __VA_ARGS__); \ + } else { } + +#define GRPC_LOG_INFO(logger, format, ...) \ + if (logger && logger->IsEnabled(ELogPriority::TLOG_INFO)) { \ + logger->Write(ELogPriority::TLOG_INFO, format, __VA_ARGS__); \ + } else { } + +} // namespace NGrpc diff --git a/library/cpp/grpc/server/ut/grpc_response_ut.cpp b/library/cpp/grpc/server/ut/grpc_response_ut.cpp new file mode 100644 index 0000000000..8abc4e4e0e --- /dev/null +++ b/library/cpp/grpc/server/ut/grpc_response_ut.cpp @@ -0,0 +1,88 @@ +#include <library/cpp/grpc/server/grpc_response.h> +#include <library/cpp/testing/unittest/registar.h> + +#include <google/protobuf/duration.pb.h> +#include <grpc++/impl/codegen/proto_utils.h> +#include <grpc++/impl/grpc_library.h> + +static ::grpc::internal::GrpcLibraryInitializer grpcInitializer; + +using namespace NGrpc; + +using google::protobuf::Duration; + +Y_UNIT_TEST_SUITE(ResponseTest) { + + template <typename T> + grpc::ByteBuffer Serialize(T resp) { + grpc::ByteBuffer buf; + bool ownBuf = false; + grpc::Status status = grpc::SerializationTraits<T>::Serialize(resp, &buf, &ownBuf); + UNIT_ASSERT(status.ok()); + return buf; + } + + template <typename T> + T Deserialize(grpc::ByteBuffer* buf) { + T message; + auto status = grpc::SerializationTraits<T>::Deserialize(buf, &message); + UNIT_ASSERT(status.ok()); + return message; + } + + Y_UNIT_TEST(UniversalResponseMsg) { + Duration d1; + d1.set_seconds(12345); + d1.set_nanos(67890); + + auto buf = Serialize(TUniversalResponse<Duration>(&d1)); + Duration d2 = Deserialize<Duration>(&buf); + + UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 12345); + UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 67890); + } + + Y_UNIT_TEST(UniversalResponseBuf) { + Duration d1; + d1.set_seconds(123); + d1.set_nanos(456); + + TString data = d1.SerializeAsString(); + grpc::Slice dataSlice{data.data(), data.size()}; + grpc::ByteBuffer dataBuf{&dataSlice, 1}; + + auto buf = Serialize(TUniversalResponse<Duration>(&dataBuf)); + Duration d2 = Deserialize<Duration>(&buf); + + UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 123); + UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 456); + } + + Y_UNIT_TEST(UniversalResponseRefMsg) { + Duration d1; + d1.set_seconds(12345); + d1.set_nanos(67890); + + auto buf = Serialize(TUniversalResponseRef<Duration>(&d1)); + Duration d2 = Deserialize<Duration>(&buf); + + UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 12345); + UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 67890); + } + + Y_UNIT_TEST(UniversalResponseRefBuf) { + Duration d1; + d1.set_seconds(123); + d1.set_nanos(456); + + TString data = d1.SerializeAsString(); + grpc::Slice dataSlice{data.data(), data.size()}; + grpc::ByteBuffer dataBuf{&dataSlice, 1}; + + auto buf = Serialize(TUniversalResponseRef<Duration>(&dataBuf)); + Duration d2 = Deserialize<Duration>(&buf); + + UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 123); + UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 456); + } +} diff --git a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp new file mode 100644 index 0000000000..c34d3b8c2b --- /dev/null +++ b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp @@ -0,0 +1,121 @@ +#include <library/cpp/grpc/server/grpc_request.h> +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/testing/unittest/tests_data.h> + +#include <util/system/thread.h> +#include <util/thread/pool.h> + +using namespace NGrpc; + +// Here we emulate stream data producer +class TOrderedProducer: public TThread { +public: + TOrderedProducer(IStreamAdaptor* adaptor, ui64 max, bool withSleep, std::function<void(ui64)>&& consumerOp) + : TThread(&ThreadProc, this) + , Adaptor_(adaptor) + , Max_(max) + , WithSleep_(withSleep) + , ConsumerOp_(std::move(consumerOp)) + {} + + static void* ThreadProc(void* _this) { + SetCurrentThreadName("OrderedProducerThread"); + static_cast<TOrderedProducer*>(_this)->Exec(); + return nullptr; + } + + void Exec() { + for (ui64 i = 0; i < Max_; i++) { + auto cb = [i, this]() mutable { + ConsumerOp_(i); + }; + Adaptor_->Enqueue(std::move(cb), false); + if (WithSleep_ && (i % 256 == 0)) { + Sleep(TDuration::MilliSeconds(10)); + } + } + } + +private: + IStreamAdaptor* Adaptor_; + const ui64 Max_; + const bool WithSleep_; + std::function<void(ui64)> ConsumerOp_; +}; + +Y_UNIT_TEST_SUITE(StreamAdaptor) { + static void OrderingTest(size_t threads, bool withSleep) { + + auto adaptor = CreateStreamAdaptor(); + + const i64 max = 10000; + + // Here we will emulate grpc stream (NextReply call after writing) + std::unique_ptr<IThreadPool> consumerQueue(new TThreadPool(TThreadPool::TParams().SetBlocking(false).SetCatching(false))); + // And make sure only one request inflight (see UNIT_ASSERT on adding to the queue) + consumerQueue->Start(threads, 1); + + // Non atomic!!! Stream adaptor must protect us + ui64 curVal = 0; + + // Used just to wait in the main thread + TAtomic finished = false; + auto consumerOp = [&finished, &curVal, ptr{adaptor.get()}, queue{consumerQueue.get()}](ui64 i) { + // Check no reordering inside stream adaptor + // and no simultanious consumer Op call + UNIT_ASSERT_VALUES_EQUAL(curVal, i); + curVal++; + // We must set finished flag after last ProcessNext, but we can`t compare curVal and max after ProcessNext + // so compare here and set after + bool tmp = curVal == max; + bool res = queue->AddFunc([ptr, &finished, tmp, &curVal, i]() { + // Additional check the value still same + // run under tsan makes sure no consumer Op call before we call ProcessNext + UNIT_ASSERT_VALUES_EQUAL(curVal, i + 1); + ptr->ProcessNext(); + // Reordering after ProcessNext is possible, so check tmp and set finished to true + if (tmp) + AtomicSet(finished, true); + }); + UNIT_ASSERT(res); + }; + + TOrderedProducer producer(adaptor.get(), max, withSleep, std::move(consumerOp)); + + producer.Start(); + producer.Join(); + + while (!AtomicGet(finished)) + { + Sleep(TDuration::MilliSeconds(100)); + } + + consumerQueue->Stop(); + + UNIT_ASSERT_VALUES_EQUAL(curVal, max); + } + + Y_UNIT_TEST(OrderingOneThread) { + OrderingTest(1, false); + } + + Y_UNIT_TEST(OrderingTwoThreads) { + OrderingTest(2, false); + } + + Y_UNIT_TEST(OrderingManyThreads) { + OrderingTest(10, false); + } + + Y_UNIT_TEST(OrderingOneThreadWithSleep) { + OrderingTest(1, true); + } + + Y_UNIT_TEST(OrderingTwoThreadsWithSleep) { + OrderingTest(2, true); + } + + Y_UNIT_TEST(OrderingManyThreadsWithSleep) { + OrderingTest(10, true); + } +} diff --git a/library/cpp/grpc/server/ut/ya.make b/library/cpp/grpc/server/ut/ya.make new file mode 100644 index 0000000000..feb3291af9 --- /dev/null +++ b/library/cpp/grpc/server/ut/ya.make @@ -0,0 +1,21 @@ +UNITTEST_FOR(library/cpp/grpc/server) + +OWNER( + dcherednik + g:kikimr +) + +TIMEOUT(600) +SIZE(MEDIUM) + +PEERDIR( + library/cpp/grpc/server +) + +SRCS( + grpc_response_ut.cpp + stream_adaptor_ut.cpp +) + +END() + diff --git a/library/cpp/grpc/server/ya.make b/library/cpp/grpc/server/ya.make new file mode 100644 index 0000000000..356a1b6793 --- /dev/null +++ b/library/cpp/grpc/server/ya.make @@ -0,0 +1,25 @@ +LIBRARY() + +OWNER( + dcherednik + g:kikimr +) + +SRCS( + event_callback.cpp + grpc_request.cpp + grpc_server.cpp + grpc_counters.cpp +) + +GENERATE_ENUM_SERIALIZATION(grpc_request_base.h) + +PEERDIR( + contrib/libs/grpc + library/cpp/monlib/dynamic_counters/percentile +) + +END() + +RECURSE_FOR_TESTS(ut) + |