diff options
author | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:17 +0300 |
commit | 4b11037e5a7d071c63e3c966199fe7102e6462e4 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc/server | |
parent | 17e20fa084178ddcb16255f974dbde74fb93608b (diff) | |
download | ydb-4b11037e5a7d071c63e3c966199fe7102e6462e4.tar.gz |
Restoring authorship annotation for Daniil Cherednik <dcherednik@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/server')
-rw-r--r-- | library/cpp/grpc/server/event_callback.cpp | 2 | ||||
-rw-r--r-- | library/cpp/grpc/server/event_callback.h | 144 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_async_ctx_base.h | 122 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_counters.cpp | 2 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_counters.h | 134 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_request.cpp | 112 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_request.h | 696 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_request_base.h | 152 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_response.h | 2 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_server.cpp | 290 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_server.h | 404 | ||||
-rw-r--r-- | library/cpp/grpc/server/ut/stream_adaptor_ut.cpp | 230 | ||||
-rw-r--r-- | library/cpp/grpc/server/ut/ya.make | 32 | ||||
-rw-r--r-- | library/cpp/grpc/server/ya.make | 42 |
14 files changed, 1182 insertions, 1182 deletions
diff --git a/library/cpp/grpc/server/event_callback.cpp b/library/cpp/grpc/server/event_callback.cpp index 559a4728077..f423836bd61 100644 --- a/library/cpp/grpc/server/event_callback.cpp +++ b/library/cpp/grpc/server/event_callback.cpp @@ -1 +1 @@ -#include "event_callback.h" +#include "event_callback.h" diff --git a/library/cpp/grpc/server/event_callback.h b/library/cpp/grpc/server/event_callback.h index c0e16ee5043..d0b700b3c92 100644 --- a/library/cpp/grpc/server/event_callback.h +++ b/library/cpp/grpc/server/event_callback.h @@ -1,80 +1,80 @@ -#pragma once - -#include "grpc_server.h" - +#pragma once + +#include "grpc_server.h" + namespace NGrpc { - -enum class EQueueEventStatus { - OK, - ERROR -}; - -template<class TCallback> + +enum class EQueueEventStatus { + OK, + ERROR +}; + +template<class TCallback> class TQueueEventCallback: public IQueueEvent { -public: - TQueueEventCallback(const TCallback& callback) - : Callback(callback) - {} - - TQueueEventCallback(TCallback&& callback) - : Callback(std::move(callback)) - {} - - bool Execute(bool ok) override { - Callback(ok ? EQueueEventStatus::OK : EQueueEventStatus::ERROR); - return false; - } - - void DestroyRequest() override { - delete this; - } - -private: - TCallback Callback; -}; - +public: + TQueueEventCallback(const TCallback& callback) + : Callback(callback) + {} + + TQueueEventCallback(TCallback&& callback) + : Callback(std::move(callback)) + {} + + bool Execute(bool ok) override { + Callback(ok ? EQueueEventStatus::OK : EQueueEventStatus::ERROR); + return false; + } + + void DestroyRequest() override { + delete this; + } + +private: + TCallback Callback; +}; + // Implementation of IQueueEvent that reduces allocations -template<class TSelf> +template<class TSelf> class TQueueFixedEvent: private IQueueEvent { - using TCallback = void (TSelf::*)(EQueueEventStatus); - -public: - TQueueFixedEvent(TSelf* self, TCallback callback) - : Self(self) - , Callback(callback) - { } - + using TCallback = void (TSelf::*)(EQueueEventStatus); + +public: + TQueueFixedEvent(TSelf* self, TCallback callback) + : Self(self) + , Callback(callback) + { } + IQueueEvent* Prepare() { - Self->Ref(); - return this; - } - -private: - bool Execute(bool ok) override { - ((*Self).*Callback)(ok ? EQueueEventStatus::OK : EQueueEventStatus::ERROR); - return false; - } - - void DestroyRequest() override { - Self->UnRef(); - } - -private: - TSelf* const Self; - TCallback const Callback; -}; - -template<class TCallback> + Self->Ref(); + return this; + } + +private: + bool Execute(bool ok) override { + ((*Self).*Callback)(ok ? EQueueEventStatus::OK : EQueueEventStatus::ERROR); + return false; + } + + void DestroyRequest() override { + Self->UnRef(); + } + +private: + TSelf* const Self; + TCallback const Callback; +}; + +template<class TCallback> inline IQueueEvent* MakeQueueEventCallback(TCallback&& callback) { - return new TQueueEventCallback<TCallback>(std::forward<TCallback>(callback)); -} - -template<class T> + return new TQueueEventCallback<TCallback>(std::forward<TCallback>(callback)); +} + +template<class T> inline IQueueEvent* MakeQueueEventCallback(T* self, void (T::*method)(EQueueEventStatus)) { - using TPtr = TIntrusivePtr<T>; - return MakeQueueEventCallback([self = TPtr(self), method] (EQueueEventStatus status) { - ((*self).*method)(status); - }); -} - + using TPtr = TIntrusivePtr<T>; + return MakeQueueEventCallback([self = TPtr(self), method] (EQueueEventStatus status) { + ((*self).*method)(status); + }); +} + } // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_async_ctx_base.h b/library/cpp/grpc/server/grpc_async_ctx_base.h index d170e2ef340..51356d4ce5a 100644 --- a/library/cpp/grpc/server/grpc_async_ctx_base.h +++ b/library/cpp/grpc/server/grpc_async_ctx_base.h @@ -1,48 +1,48 @@ -#pragma once - -#include "grpc_server.h" - -#include <util/generic/vector.h> -#include <util/generic/string.h> -#include <util/system/yassert.h> +#pragma once + +#include "grpc_server.h" + +#include <util/generic/vector.h> +#include <util/generic/string.h> +#include <util/system/yassert.h> #include <util/generic/set.h> - -#include <grpc++/server.h> -#include <grpc++/server_context.h> - -#include <chrono> - + +#include <grpc++/server.h> +#include <grpc++/server_context.h> + +#include <chrono> + namespace NGrpc { - -template<typename TService> + +template<typename TService> class TBaseAsyncContext: public ICancelableContext { -public: - TBaseAsyncContext(typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq) - : Service(service) - , CQ(cq) - { - } - - TString GetPeerName() const { - return TString(Context.peer()); - } - - TInstant Deadline() const { - // The timeout transferred in "grpc-timeout" header [1] and calculated from the deadline +public: + TBaseAsyncContext(typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq) + : Service(service) + , CQ(cq) + { + } + + TString GetPeerName() const { + return TString(Context.peer()); + } + + TInstant Deadline() const { + // The timeout transferred in "grpc-timeout" header [1] and calculated from the deadline // right before the request is getting to be send. // 1. https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md // - // After this timeout calculated back to the deadline on the server side - // using server grpc GPR_CLOCK_MONOTONIC time (raw_deadline() method). - // deadline() method convert this to epoch related deadline GPR_CLOCK_REALTIME - // - - std::chrono::system_clock::time_point t = Context.deadline(); - if (t == std::chrono::system_clock::time_point::max()) { - return TInstant::Max(); + // After this timeout calculated back to the deadline on the server side + // using server grpc GPR_CLOCK_MONOTONIC time (raw_deadline() method). + // deadline() method convert this to epoch related deadline GPR_CLOCK_REALTIME + // + + std::chrono::system_clock::time_point t = Context.deadline(); + if (t == std::chrono::system_clock::time_point::max()) { + return TInstant::Max(); } - auto us = std::chrono::time_point_cast<std::chrono::microseconds>(t); - return TInstant::MicroSeconds(us.time_since_epoch().count()); + auto us = std::chrono::time_point_cast<std::chrono::microseconds>(t); + return TInstant::MicroSeconds(us.time_since_epoch().count()); } TSet<TStringBuf> GetPeerMetaKeys() const { @@ -54,7 +54,7 @@ public: } TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const { - const auto& clientMetadata = Context.client_metadata(); + const auto& clientMetadata = Context.client_metadata(); const auto range = clientMetadata.equal_range(grpc::string_ref{key.data(), key.size()}); if (range.first == range.second) { return {}; @@ -63,32 +63,32 @@ public: TVector<TStringBuf> values; values.reserve(std::distance(range.first, range.second)); - for (auto it = range.first; it != range.second; ++it) { + for (auto it = range.first; it != range.second; ++it) { values.emplace_back(it->second.data(), it->second.size()); - } + } return values; - } - + } + grpc_compression_level GetCompressionLevel() const { return Context.compression_level(); } - void Shutdown() override { - // Shutdown may only be called after request has started successfully - if (Context.c_call()) - Context.TryCancel(); - } - -protected: - //! The means of communication with the gRPC runtime for an asynchronous - //! server. - typename TService::TCurrentGRpcService::AsyncService* const Service; - //! The producer-consumer queue where for asynchronous server notifications. - grpc::ServerCompletionQueue* const CQ; - //! Context for the rpc, allowing to tweak aspects of it such as the use - //! of compression, authentication, as well as to send metadata back to the - //! client. - grpc::ServerContext Context; -}; - + void Shutdown() override { + // Shutdown may only be called after request has started successfully + if (Context.c_call()) + Context.TryCancel(); + } + +protected: + //! The means of communication with the gRPC runtime for an asynchronous + //! server. + typename TService::TCurrentGRpcService::AsyncService* const Service; + //! The producer-consumer queue where for asynchronous server notifications. + grpc::ServerCompletionQueue* const CQ; + //! Context for the rpc, allowing to tweak aspects of it such as the use + //! of compression, authentication, as well as to send metadata back to the + //! client. + grpc::ServerContext Context; +}; + } // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_counters.cpp b/library/cpp/grpc/server/grpc_counters.cpp index f3a0b2b5e98..fa96e0100b9 100644 --- a/library/cpp/grpc/server/grpc_counters.cpp +++ b/library/cpp/grpc/server/grpc_counters.cpp @@ -1,4 +1,4 @@ -#include "grpc_counters.h" +#include "grpc_counters.h" namespace NGrpc { namespace { diff --git a/library/cpp/grpc/server/grpc_counters.h b/library/cpp/grpc/server/grpc_counters.h index 25b7c28369c..0b6c36c84cc 100644 --- a/library/cpp/grpc/server/grpc_counters.h +++ b/library/cpp/grpc/server/grpc_counters.h @@ -1,11 +1,11 @@ -#pragma once - +#pragma once + #include <library/cpp/monlib/dynamic_counters/percentile/percentile.h> #include <library/cpp/monlib/dynamic_counters/counters.h> -#include <util/generic/ptr.h> - +#include <util/generic/ptr.h> + namespace NGrpc { - + struct ICounterBlock : public TThrRefBase { virtual void CountNotOkRequest() = 0; virtual void CountNotOkResponse() = 0; @@ -15,9 +15,9 @@ struct ICounterBlock : public TThrRefBase { virtual void CountResponseBytes(ui32 responseSize) = 0; virtual void StartProcessing(ui32 requestSize) = 0; virtual void FinishProcessing(ui32 requestSize, ui32 responseSize, bool ok, ui32 status, TDuration requestDuration) = 0; - virtual void CountRequestsWithoutDatabase() {} - virtual void CountRequestsWithoutToken() {} - virtual void CountRequestWithoutTls() {} + virtual void CountRequestsWithoutDatabase() {} + virtual void CountRequestsWithoutToken() {} + virtual void CountRequestWithoutTls() {} virtual TIntrusivePtr<ICounterBlock> Clone() { return this; } virtual void UseDatabase(const TString& database) { Y_UNUSED(database); } @@ -26,62 +26,62 @@ struct ICounterBlock : public TThrRefBase { using ICounterBlockPtr = TIntrusivePtr<ICounterBlock>; class TCounterBlock final : public ICounterBlock { - NMonitoring::TDynamicCounters::TCounterPtr TotalCounter; - NMonitoring::TDynamicCounters::TCounterPtr InflyCounter; - NMonitoring::TDynamicCounters::TCounterPtr NotOkRequestCounter; - NMonitoring::TDynamicCounters::TCounterPtr NotOkResponseCounter; - NMonitoring::TDynamicCounters::TCounterPtr RequestBytes; - NMonitoring::TDynamicCounters::TCounterPtr InflyRequestBytes; - NMonitoring::TDynamicCounters::TCounterPtr ResponseBytes; - NMonitoring::TDynamicCounters::TCounterPtr NotAuthenticated; - NMonitoring::TDynamicCounters::TCounterPtr ResourceExhausted; + NMonitoring::TDynamicCounters::TCounterPtr TotalCounter; + NMonitoring::TDynamicCounters::TCounterPtr InflyCounter; + NMonitoring::TDynamicCounters::TCounterPtr NotOkRequestCounter; + NMonitoring::TDynamicCounters::TCounterPtr NotOkResponseCounter; + NMonitoring::TDynamicCounters::TCounterPtr RequestBytes; + NMonitoring::TDynamicCounters::TCounterPtr InflyRequestBytes; + NMonitoring::TDynamicCounters::TCounterPtr ResponseBytes; + NMonitoring::TDynamicCounters::TCounterPtr NotAuthenticated; + NMonitoring::TDynamicCounters::TCounterPtr ResourceExhausted; bool Percentile = false; NMonitoring::TPercentileTracker<4, 512, 15> RequestHistMs; - std::array<NMonitoring::TDynamicCounters::TCounterPtr, 2> GRpcStatusCounters; - -public: - TCounterBlock(NMonitoring::TDynamicCounters::TCounterPtr totalCounter, - NMonitoring::TDynamicCounters::TCounterPtr inflyCounter, - NMonitoring::TDynamicCounters::TCounterPtr notOkRequestCounter, - NMonitoring::TDynamicCounters::TCounterPtr notOkResponseCounter, - NMonitoring::TDynamicCounters::TCounterPtr requestBytes, - NMonitoring::TDynamicCounters::TCounterPtr inflyRequestBytes, + std::array<NMonitoring::TDynamicCounters::TCounterPtr, 2> GRpcStatusCounters; + +public: + TCounterBlock(NMonitoring::TDynamicCounters::TCounterPtr totalCounter, + NMonitoring::TDynamicCounters::TCounterPtr inflyCounter, + NMonitoring::TDynamicCounters::TCounterPtr notOkRequestCounter, + NMonitoring::TDynamicCounters::TCounterPtr notOkResponseCounter, + NMonitoring::TDynamicCounters::TCounterPtr requestBytes, + NMonitoring::TDynamicCounters::TCounterPtr inflyRequestBytes, NMonitoring::TDynamicCounters::TCounterPtr responseBytes, - NMonitoring::TDynamicCounters::TCounterPtr notAuthenticated, - NMonitoring::TDynamicCounters::TCounterPtr resourceExhausted, + NMonitoring::TDynamicCounters::TCounterPtr notAuthenticated, + NMonitoring::TDynamicCounters::TCounterPtr resourceExhausted, TIntrusivePtr<NMonitoring::TDynamicCounters> group) - : TotalCounter(std::move(totalCounter)) - , InflyCounter(std::move(inflyCounter)) - , NotOkRequestCounter(std::move(notOkRequestCounter)) - , NotOkResponseCounter(std::move(notOkResponseCounter)) - , RequestBytes(std::move(requestBytes)) - , InflyRequestBytes(std::move(inflyRequestBytes)) - , ResponseBytes(std::move(responseBytes)) - , NotAuthenticated(std::move(notAuthenticated)) - , ResourceExhausted(std::move(resourceExhausted)) + : TotalCounter(std::move(totalCounter)) + , InflyCounter(std::move(inflyCounter)) + , NotOkRequestCounter(std::move(notOkRequestCounter)) + , NotOkResponseCounter(std::move(notOkResponseCounter)) + , RequestBytes(std::move(requestBytes)) + , InflyRequestBytes(std::move(inflyRequestBytes)) + , ResponseBytes(std::move(responseBytes)) + , NotAuthenticated(std::move(notAuthenticated)) + , ResourceExhausted(std::move(resourceExhausted)) { if (group) { RequestHistMs.Initialize(group, "event", "request", "ms", {0.5f, 0.9f, 0.99f, 0.999f, 1.0f}); Percentile = true; } } - + void CountNotOkRequest() override { - NotOkRequestCounter->Inc(); - } - + NotOkRequestCounter->Inc(); + } + void CountNotOkResponse() override { - NotOkResponseCounter->Inc(); - } - + NotOkResponseCounter->Inc(); + } + void CountNotAuthenticated() override { - NotAuthenticated->Inc(); - } - + NotAuthenticated->Inc(); + } + void CountResourceExhausted() override { - ResourceExhausted->Inc(); - } - + ResourceExhausted->Inc(); + } + void CountRequestBytes(ui32 requestSize) override { *RequestBytes += requestSize; } @@ -91,27 +91,27 @@ public: } void StartProcessing(ui32 requestSize) override { - TotalCounter->Inc(); - InflyCounter->Inc(); - *RequestBytes += requestSize; - *InflyRequestBytes += requestSize; - } - + TotalCounter->Inc(); + InflyCounter->Inc(); + *RequestBytes += requestSize; + *InflyRequestBytes += requestSize; + } + void FinishProcessing(ui32 requestSize, ui32 responseSize, bool ok, ui32 status, TDuration requestDuration) override { Y_UNUSED(status); - InflyCounter->Dec(); - *InflyRequestBytes -= requestSize; - *ResponseBytes += responseSize; - if (!ok) { - NotOkResponseCounter->Inc(); - } + InflyCounter->Dec(); + *InflyRequestBytes -= requestSize; + *ResponseBytes += responseSize; + if (!ok) { + NotOkResponseCounter->Inc(); + } if (Percentile) { RequestHistMs.Increment(requestDuration.MilliSeconds()); } - } + } ICounterBlockPtr Clone() override { return this; @@ -122,10 +122,10 @@ public: RequestHistMs.Update(); } } -}; - -using TCounterBlockPtr = TIntrusivePtr<TCounterBlock>; - +}; + +using TCounterBlockPtr = TIntrusivePtr<TCounterBlock>; + /** * Creates new instance of ICounterBlock implementation which does nothing. * diff --git a/library/cpp/grpc/server/grpc_request.cpp b/library/cpp/grpc/server/grpc_request.cpp index 33264fe6f29..d18a32776f2 100644 --- a/library/cpp/grpc/server/grpc_request.cpp +++ b/library/cpp/grpc/server/grpc_request.cpp @@ -1,59 +1,59 @@ -#include "grpc_request.h" - +#include "grpc_request.h" + namespace NGrpc { - -const char* GRPC_USER_AGENT_HEADER = "user-agent"; - + +const char* GRPC_USER_AGENT_HEADER = "user-agent"; + class TStreamAdaptor: public IStreamAdaptor { -public: - TStreamAdaptor() - : StreamIsReady_(true) - {} - - void Enqueue(std::function<void()>&& fn, bool urgent) override { - with_lock(Mtx_) { - if (!UrgentQueue_.empty() || !NormalQueue_.empty()) { - Y_VERIFY(!StreamIsReady_); - } - auto& queue = urgent ? UrgentQueue_ : NormalQueue_; - if (StreamIsReady_ && queue.empty()) { - StreamIsReady_ = false; - } else { - queue.push_back(std::move(fn)); - return; - } - } - fn(); - } - - size_t ProcessNext() override { - size_t left = 0; - std::function<void()> fn; - with_lock(Mtx_) { - Y_VERIFY(!StreamIsReady_); - auto& queue = UrgentQueue_.empty() ? NormalQueue_ : UrgentQueue_; - if (queue.empty()) { - // Both queues are empty - StreamIsReady_ = true; - } else { - fn = std::move(queue.front()); - queue.pop_front(); - left = UrgentQueue_.size() + NormalQueue_.size(); - } - } - if (fn) - fn(); - return left; - } -private: - bool StreamIsReady_; - TList<std::function<void()>> NormalQueue_; - TList<std::function<void()>> UrgentQueue_; - TMutex Mtx_; -}; - -IStreamAdaptor::TPtr CreateStreamAdaptor() { - return std::make_unique<TStreamAdaptor>(); -} - +public: + TStreamAdaptor() + : StreamIsReady_(true) + {} + + void Enqueue(std::function<void()>&& fn, bool urgent) override { + with_lock(Mtx_) { + if (!UrgentQueue_.empty() || !NormalQueue_.empty()) { + Y_VERIFY(!StreamIsReady_); + } + auto& queue = urgent ? UrgentQueue_ : NormalQueue_; + if (StreamIsReady_ && queue.empty()) { + StreamIsReady_ = false; + } else { + queue.push_back(std::move(fn)); + return; + } + } + fn(); + } + + size_t ProcessNext() override { + size_t left = 0; + std::function<void()> fn; + with_lock(Mtx_) { + Y_VERIFY(!StreamIsReady_); + auto& queue = UrgentQueue_.empty() ? NormalQueue_ : UrgentQueue_; + if (queue.empty()) { + // Both queues are empty + StreamIsReady_ = true; + } else { + fn = std::move(queue.front()); + queue.pop_front(); + left = UrgentQueue_.size() + NormalQueue_.size(); + } + } + if (fn) + fn(); + return left; + } +private: + bool StreamIsReady_; + TList<std::function<void()>> NormalQueue_; + TList<std::function<void()>> UrgentQueue_; + TMutex Mtx_; +}; + +IStreamAdaptor::TPtr CreateStreamAdaptor() { + return std::make_unique<TStreamAdaptor>(); +} + } // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h index 2841069d62f..5bd8d3902b5 100644 --- a/library/cpp/grpc/server/grpc_request.h +++ b/library/cpp/grpc/server/grpc_request.h @@ -1,118 +1,118 @@ -#pragma once - +#pragma once + #include <google/protobuf/text_format.h> #include <google/protobuf/arena.h> #include <google/protobuf/message.h> - + #include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/logger/priority.h> - + #include "grpc_response.h" -#include "event_callback.h" +#include "event_callback.h" #include "grpc_async_ctx_base.h" #include "grpc_counters.h" -#include "grpc_request_base.h" -#include "grpc_server.h" +#include "grpc_request_base.h" +#include "grpc_server.h" #include "logger.h" - + #include <util/system/hp_timer.h> -#include <grpc++/server.h> -#include <grpc++/server_context.h> +#include <grpc++/server.h> +#include <grpc++/server_context.h> #include <grpc++/support/async_stream.h> #include <grpc++/support/async_unary_call.h> -#include <grpc++/support/byte_buffer.h> -#include <grpc++/impl/codegen/async_stream.h> - +#include <grpc++/support/byte_buffer.h> +#include <grpc++/impl/codegen/async_stream.h> + namespace NGrpc { - -class IStreamAdaptor { -public: - using TPtr = std::unique_ptr<IStreamAdaptor>; - virtual void Enqueue(std::function<void()>&& fn, bool urgent) = 0; - virtual size_t ProcessNext() = 0; - virtual ~IStreamAdaptor() = default; -}; - -IStreamAdaptor::TPtr CreateStreamAdaptor(); - -/////////////////////////////////////////////////////////////////////////////// + +class IStreamAdaptor { +public: + using TPtr = std::unique_ptr<IStreamAdaptor>; + virtual void Enqueue(std::function<void()>&& fn, bool urgent) = 0; + virtual size_t ProcessNext() = 0; + virtual ~IStreamAdaptor() = default; +}; + +IStreamAdaptor::TPtr CreateStreamAdaptor(); + +/////////////////////////////////////////////////////////////////////////////// template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter, typename TOutProtoPrinter> class TGRpcRequestImpl - : public TBaseAsyncContext<TService> - , public IQueueEvent - , public IRequestContextBase -{ + : public TBaseAsyncContext<TService> + , public IQueueEvent + , public IRequestContextBase +{ using TThis = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>; -public: - using TOnRequest = std::function<void (IRequestContextBase* ctx)>; - using TRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*, - grpc::ServerAsyncResponseWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*); - using TStreamRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*, - grpc::ServerAsyncWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*); - +public: + using TOnRequest = std::function<void (IRequestContextBase* ctx)>; + using TRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*, + grpc::ServerAsyncResponseWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*); + using TStreamRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*, + grpc::ServerAsyncWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*); + TGRpcRequestImpl(TService* server, - typename TService::TCurrentGRpcService::AsyncService* service, - grpc::ServerCompletionQueue* cq, - TOnRequest cb, - TRequestCallback requestCallback, + typename TService::TCurrentGRpcService::AsyncService* service, + grpc::ServerCompletionQueue* cq, + TOnRequest cb, + TRequestCallback requestCallback, const char* name, TLoggerPtr logger, ICounterBlockPtr counters, IGRpcRequestLimiterPtr limiter) - : TBaseAsyncContext<TService>(service, cq) - , Server_(server) - , Cb_(cb) - , RequestCallback_(requestCallback) - , StreamRequestCallback_(nullptr) - , Name_(name) + : TBaseAsyncContext<TService>(service, cq) + , Server_(server) + , Cb_(cb) + , RequestCallback_(requestCallback) + , StreamRequestCallback_(nullptr) + , Name_(name) , Logger_(std::move(logger)) - , Counters_(std::move(counters)) + , Counters_(std::move(counters)) , RequestLimiter_(std::move(limiter)) , Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context)) , StateFunc_(&TThis::SetRequestDone) - { - AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false); - Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_); - Y_VERIFY(Request_); + { + AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false); + Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_); + Y_VERIFY(Request_); GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_); - FinishPromise_ = NThreading::NewPromise<EFinishStatus>(); - } - + FinishPromise_ = NThreading::NewPromise<EFinishStatus>(); + } + TGRpcRequestImpl(TService* server, - typename TService::TCurrentGRpcService::AsyncService* service, - grpc::ServerCompletionQueue* cq, - TOnRequest cb, - TStreamRequestCallback requestCallback, + typename TService::TCurrentGRpcService::AsyncService* service, + grpc::ServerCompletionQueue* cq, + TOnRequest cb, + TStreamRequestCallback requestCallback, const char* name, TLoggerPtr logger, ICounterBlockPtr counters, IGRpcRequestLimiterPtr limiter) - : TBaseAsyncContext<TService>(service, cq) - , Server_(server) - , Cb_(cb) - , RequestCallback_(nullptr) - , StreamRequestCallback_(requestCallback) - , Name_(name) + : TBaseAsyncContext<TService>(service, cq) + , Server_(server) + , Cb_(cb) + , RequestCallback_(nullptr) + , StreamRequestCallback_(requestCallback) + , Name_(name) , Logger_(std::move(logger)) - , Counters_(std::move(counters)) + , Counters_(std::move(counters)) , RequestLimiter_(std::move(limiter)) - , StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context)) + , StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context)) , StateFunc_(&TThis::SetRequestDone) - { - AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false); - Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_); - Y_VERIFY(Request_); + { + AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false); + Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_); + Y_VERIFY(Request_); GRPC_LOG_DEBUG(Logger_, "[%p] created streaming request Name# %s", this, Name_); - FinishPromise_ = NThreading::NewPromise<EFinishStatus>(); - StreamAdaptor_ = CreateStreamAdaptor(); - } - - TAsyncFinishResult GetFinishFuture() override { - return FinishPromise_.GetFuture(); - } - + FinishPromise_ = NThreading::NewPromise<EFinishStatus>(); + StreamAdaptor_ = CreateStreamAdaptor(); + } + + TAsyncFinishResult GetFinishFuture() override { + return FinishPromise_.GetFuture(); + } + TString GetPeer() const override { return TString(this->Context.peer()); } @@ -121,7 +121,7 @@ public: return Server_->SslServer(); } - void Run() { + void Run() { // Start request unless server is shutting down if (auto guard = Server_->ProtectShutdown()) { Ref(); //For grpc c runtime @@ -135,28 +135,28 @@ public: (&this->Context, Request_, reinterpret_cast<grpc::ServerAsyncWriter<TOut>*>(StreamWriter_.Get()), this->CQ, this->CQ, GetGRpcTag()); } - } - } - + } + } + ~TGRpcRequestImpl() { - // No direct dtor call allowed - Y_ASSERT(RefCount() == 0); - } - - bool Execute(bool ok) override { - return (this->*StateFunc_)(ok); - } - - void DestroyRequest() override { + // No direct dtor call allowed + Y_ASSERT(RefCount() == 0); + } + + bool Execute(bool ok) override { + return (this->*StateFunc_)(ok); + } + + void DestroyRequest() override { if (RequestRegistered_) { Server_->DeregisterRequestCtx(this); RequestRegistered_ = false; } - UnRef(); - } - - TInstant Deadline() const override { - return TBaseAsyncContext<TService>::Deadline(); + UnRef(); + } + + TInstant Deadline() const override { + return TBaseAsyncContext<TService>::Deadline(); } TSet<TStringBuf> GetPeerMetaKeys() const override { @@ -164,299 +164,299 @@ public: } TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const override { - return TBaseAsyncContext<TService>::GetPeerMetaValues(key); - } - + return TBaseAsyncContext<TService>::GetPeerMetaValues(key); + } + grpc_compression_level GetCompressionLevel() const override { return TBaseAsyncContext<TService>::GetCompressionLevel(); } - //! Get pointer to the request's message. - const NProtoBuf::Message* GetRequest() const override { - return Request_; - } - - TAuthState& GetAuthState() override { - return AuthState_; - } - + //! Get pointer to the request's message. + const NProtoBuf::Message* GetRequest() const override { + return Request_; + } + + TAuthState& GetAuthState() override { + return AuthState_; + } + void Reply(NProtoBuf::Message* resp, ui32 status) override { ResponseStatus = status; - WriteDataOk(resp); - } - + WriteDataOk(resp); + } + void Reply(grpc::ByteBuffer* resp, ui32 status) override { ResponseStatus = status; - WriteByteDataOk(resp); - } - + WriteByteDataOk(resp); + } + void ReplyError(grpc::StatusCode code, const TString& msg) override { - FinishGrpcStatus(code, msg, false); - } - - void ReplyUnauthenticated(const TString& in) override { - const TString message = in.empty() ? TString("unauthenticated") : TString("unauthenticated, ") + in; - FinishGrpcStatus(grpc::StatusCode::UNAUTHENTICATED, message, false); - } - - void SetNextReplyCallback(TOnNextReply&& cb) override { - NextReplyCb_ = cb; - } - + FinishGrpcStatus(code, msg, false); + } + + void ReplyUnauthenticated(const TString& in) override { + const TString message = in.empty() ? TString("unauthenticated") : TString("unauthenticated, ") + in; + FinishGrpcStatus(grpc::StatusCode::UNAUTHENTICATED, message, false); + } + + void SetNextReplyCallback(TOnNextReply&& cb) override { + NextReplyCb_ = cb; + } + void AddTrailingMetadata(const TString& key, const TString& value) override { - this->Context.AddTrailingMetadata(key, value); - } - - void FinishStreamingOk() override { - GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (enqueued)", this, Name_, - this->Context.peer().c_str()); - auto cb = [this]() { - StateFunc_ = &TThis::SetFinishDone; - GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (pushed to grpc)", this, Name_, - this->Context.peer().c_str()); - - StreamWriter_->Finish(grpc::Status::OK, GetGRpcTag()); - }; - StreamAdaptor_->Enqueue(std::move(cb), false); - } - - google::protobuf::Arena* GetArena() override { - return &Arena_; - } - + this->Context.AddTrailingMetadata(key, value); + } + + void FinishStreamingOk() override { + GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (enqueued)", this, Name_, + this->Context.peer().c_str()); + auto cb = [this]() { + StateFunc_ = &TThis::SetFinishDone; + GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (pushed to grpc)", this, Name_, + this->Context.peer().c_str()); + + StreamWriter_->Finish(grpc::Status::OK, GetGRpcTag()); + }; + StreamAdaptor_->Enqueue(std::move(cb), false); + } + + google::protobuf::Arena* GetArena() override { + return &Arena_; + } + void UseDatabase(const TString& database) override { Counters_->UseDatabase(database); } -private: - void Clone() { - if (!Server_->IsShuttingDown()) { - if (RequestCallback_) { +private: + void Clone() { + if (!Server_->IsShuttingDown()) { + if (RequestCallback_) { MakeIntrusive<TThis>( Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); - } else { + } else { MakeIntrusive<TThis>( Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run(); - } - } - } - - void WriteDataOk(NProtoBuf::Message* resp) { - auto makeResponseString = [&] { - TString x; + } + } + } + + void WriteDataOk(NProtoBuf::Message* resp) { + auto makeResponseString = [&] { + TString x; TOutProtoPrinter printer; - printer.SetSingleLineMode(true); - printer.PrintToString(*resp, &x); - return x; - }; - - auto sz = (size_t)resp->ByteSize(); - if (Writer_) { - GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s", this, Name_, - makeResponseString().data(), this->Context.peer().c_str()); + printer.SetSingleLineMode(true); + printer.PrintToString(*resp, &x); + return x; + }; + + auto sz = (size_t)resp->ByteSize(); + if (Writer_) { + GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s", this, Name_, + makeResponseString().data(), this->Context.peer().c_str()); StateFunc_ = &TThis::SetFinishDone; - ResponseSize = sz; - Y_VERIFY(this->Context.c_call()); + ResponseSize = sz; + Y_VERIFY(this->Context.c_call()); Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); - } else { - GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (enqueued)", - this, Name_, makeResponseString().data(), this->Context.peer().c_str()); + } else { + GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (enqueued)", + this, Name_, makeResponseString().data(), this->Context.peer().c_str()); // because of std::function cannot hold move-only captured object // we allocate shared object on heap to avoid message copy auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp); auto cb = [this, uResp = std::move(uResp), sz, &makeResponseString]() { - GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (pushed to grpc)", - this, Name_, makeResponseString().data(), this->Context.peer().c_str()); - StateFunc_ = &TThis::NextReply; - ResponseSize += sz; + GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (pushed to grpc)", + this, Name_, makeResponseString().data(), this->Context.peer().c_str()); + StateFunc_ = &TThis::NextReply; + ResponseSize += sz; StreamWriter_->Write(*uResp, GetGRpcTag()); - }; - StreamAdaptor_->Enqueue(std::move(cb), false); - } - } - - void WriteByteDataOk(grpc::ByteBuffer* resp) { - auto sz = resp->Length(); - if (Writer_) { - GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_, - this->Context.peer().c_str()); + }; + StreamAdaptor_->Enqueue(std::move(cb), false); + } + } + + void WriteByteDataOk(grpc::ByteBuffer* resp) { + auto sz = resp->Length(); + if (Writer_) { + GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_, + this->Context.peer().c_str()); StateFunc_ = &TThis::SetFinishDone; - ResponseSize = sz; + ResponseSize = sz; Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag()); - } else { - GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (enqueued)", this, Name_, - this->Context.peer().c_str()); + } else { + GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (enqueued)", this, Name_, + this->Context.peer().c_str()); // because of std::function cannot hold move-only captured object // we allocate shared object on heap to avoid buffer copy auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp); auto cb = [this, uResp = std::move(uResp), sz]() { - GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)", - this, Name_, this->Context.peer().c_str()); - StateFunc_ = &TThis::NextReply; - ResponseSize += sz; + GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)", + this, Name_, this->Context.peer().c_str()); + StateFunc_ = &TThis::NextReply; + ResponseSize += sz; StreamWriter_->Write(*uResp, GetGRpcTag()); - }; - StreamAdaptor_->Enqueue(std::move(cb), false); - } - } - - void FinishGrpcStatus(grpc::StatusCode code, const TString& msg, bool urgent) { - Y_VERIFY(code != grpc::OK); - if (code == grpc::StatusCode::UNAUTHENTICATED) { - Counters_->CountNotAuthenticated(); - } else if (code == grpc::StatusCode::RESOURCE_EXHAUSTED) { - Counters_->CountResourceExhausted(); - } - - if (Writer_) { - GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)", this, - Name_, msg.c_str(), this->Context.peer().c_str(), (int)code); - StateFunc_ = &TThis::SetFinishError; + }; + StreamAdaptor_->Enqueue(std::move(cb), false); + } + } + + void FinishGrpcStatus(grpc::StatusCode code, const TString& msg, bool urgent) { + Y_VERIFY(code != grpc::OK); + if (code == grpc::StatusCode::UNAUTHENTICATED) { + Counters_->CountNotAuthenticated(); + } else if (code == grpc::StatusCode::RESOURCE_EXHAUSTED) { + Counters_->CountResourceExhausted(); + } + + if (Writer_) { + GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)", this, + Name_, msg.c_str(), this->Context.peer().c_str(), (int)code); + StateFunc_ = &TThis::SetFinishError; TOut resp; Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg), GetGRpcTag()); - } else { - GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)" - " (enqueued)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code); - auto cb = [this, code, msg]() { - GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)" - " (pushed to grpc)", this, Name_, msg.c_str(), - this->Context.peer().c_str(), (int)code); - StateFunc_ = &TThis::SetFinishError; - StreamWriter_->Finish(grpc::Status(code, msg), GetGRpcTag()); - }; - StreamAdaptor_->Enqueue(std::move(cb), urgent); - } - } - - bool SetRequestDone(bool ok) { - auto makeRequestString = [&] { - TString resp; - if (ok) { + } else { + GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)" + " (enqueued)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code); + auto cb = [this, code, msg]() { + GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)" + " (pushed to grpc)", this, Name_, msg.c_str(), + this->Context.peer().c_str(), (int)code); + StateFunc_ = &TThis::SetFinishError; + StreamWriter_->Finish(grpc::Status(code, msg), GetGRpcTag()); + }; + StreamAdaptor_->Enqueue(std::move(cb), urgent); + } + } + + bool SetRequestDone(bool ok) { + auto makeRequestString = [&] { + TString resp; + if (ok) { TInProtoPrinter printer; - printer.SetSingleLineMode(true); - printer.PrintToString(*Request_, &resp); - } else { - resp = "<not ok>"; - } - return resp; - }; + printer.SetSingleLineMode(true); + printer.PrintToString(*Request_, &resp); + } else { + resp = "<not ok>"; + } + return resp; + }; GRPC_LOG_DEBUG(Logger_, "[%p] received request Name# %s ok# %s data# %s peer# %s", this, Name_, ok ? "true" : "false", makeRequestString().data(), this->Context.peer().c_str()); - - if (this->Context.c_call() == nullptr) { + + if (this->Context.c_call() == nullptr) { Y_VERIFY(!ok); - // One ref by OnFinishTag, grpc will not call this tag if no request received - UnRef(); + // One ref by OnFinishTag, grpc will not call this tag if no request received + UnRef(); } else if (!(RequestRegistered_ = Server_->RegisterRequestCtx(this))) { // Request cannot be registered due to shutdown // It's unsafe to continue, so drop this request without processing GRPC_LOG_DEBUG(Logger_, "[%p] dropping request Name# %s due to shutdown", this, Name_); this->Context.TryCancel(); return false; - } - - Clone(); // TODO: Request pool? - if (!ok) { - Counters_->CountNotOkRequest(); - return false; - } - + } + + Clone(); // TODO: Request pool? + if (!ok) { + Counters_->CountNotOkRequest(); + return false; + } + if (IncRequest()) { - // Adjust counters. - RequestSize = Request_->ByteSize(); - Counters_->StartProcessing(RequestSize); + // Adjust counters. + RequestSize = Request_->ByteSize(); + Counters_->StartProcessing(RequestSize); RequestTimer.Reset(); - - if (!SslServer()) { - Counters_->CountRequestWithoutTls(); - } - - //TODO: Move this in to grpc_request_proxy + + if (!SslServer()) { + Counters_->CountRequestWithoutTls(); + } + + //TODO: Move this in to grpc_request_proxy auto maybeDatabase = GetPeerMetaValues(TStringBuf("x-ydb-database")); - if (maybeDatabase.empty()) { - Counters_->CountRequestsWithoutDatabase(); - } + if (maybeDatabase.empty()) { + Counters_->CountRequestsWithoutDatabase(); + } auto maybeToken = GetPeerMetaValues(TStringBuf("x-ydb-auth-ticket")); - if (maybeToken.empty() || maybeToken[0].empty()) { + if (maybeToken.empty() || maybeToken[0].empty()) { TString db{maybeDatabase ? maybeDatabase[0] : TStringBuf{}}; - Counters_->CountRequestsWithoutToken(); - GRPC_LOG_DEBUG(Logger_, "[%p] received request without user token " - "Name# %s data# %s peer# %s database# %s", this, Name_, - makeRequestString().data(), this->Context.peer().c_str(), db.c_str()); - } - - // Handle current request. - Cb_(this); - } else { - //This request has not been counted - SkipUpdateCountersOnError = true; - FinishGrpcStatus(grpc::StatusCode::RESOURCE_EXHAUSTED, "no resource", true); - } - return true; - } - - bool NextReply(bool ok) { - auto logCb = [this, ok](int left) { - GRPC_LOG_DEBUG(Logger_, "[%p] ready for next reply Name# %s ok# %s peer# %s left# %d", this, Name_, - ok ? "true" : "false", this->Context.peer().c_str(), left); - }; - - if (!ok) { - logCb(-1); + Counters_->CountRequestsWithoutToken(); + GRPC_LOG_DEBUG(Logger_, "[%p] received request without user token " + "Name# %s data# %s peer# %s database# %s", this, Name_, + makeRequestString().data(), this->Context.peer().c_str(), db.c_str()); + } + + // Handle current request. + Cb_(this); + } else { + //This request has not been counted + SkipUpdateCountersOnError = true; + FinishGrpcStatus(grpc::StatusCode::RESOURCE_EXHAUSTED, "no resource", true); + } + return true; + } + + bool NextReply(bool ok) { + auto logCb = [this, ok](int left) { + GRPC_LOG_DEBUG(Logger_, "[%p] ready for next reply Name# %s ok# %s peer# %s left# %d", this, Name_, + ok ? "true" : "false", this->Context.peer().c_str(), left); + }; + + if (!ok) { + logCb(-1); DecRequest(); - Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus, - TDuration::Seconds(RequestTimer.Passed())); - return false; - } - - Ref(); // To prevent destroy during this call in case of execution Finish - size_t left = StreamAdaptor_->ProcessNext(); - logCb(left); - if (NextReplyCb_) { - NextReplyCb_(left); - } - // Now it is safe to destroy even if Finish was called - UnRef(); - return true; - } - - bool SetFinishDone(bool ok) { + Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus, + TDuration::Seconds(RequestTimer.Passed())); + return false; + } + + Ref(); // To prevent destroy during this call in case of execution Finish + size_t left = StreamAdaptor_->ProcessNext(); + logCb(left); + if (NextReplyCb_) { + NextReplyCb_(left); + } + // Now it is safe to destroy even if Finish was called + UnRef(); + return true; + } + + bool SetFinishDone(bool ok) { GRPC_LOG_DEBUG(Logger_, "[%p] finished request Name# %s ok# %s peer# %s", this, Name_, - ok ? "true" : "false", this->Context.peer().c_str()); - //PrintBackTrace(); + ok ? "true" : "false", this->Context.peer().c_str()); + //PrintBackTrace(); DecRequest(); Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus, TDuration::Seconds(RequestTimer.Passed())); - return false; - } - - bool SetFinishError(bool ok) { + return false; + } + + bool SetFinishError(bool ok) { GRPC_LOG_DEBUG(Logger_, "[%p] finished request with error Name# %s ok# %s peer# %s", this, Name_, - ok ? "true" : "false", this->Context.peer().c_str()); - if (!SkipUpdateCountersOnError) { + ok ? "true" : "false", this->Context.peer().c_str()); + if (!SkipUpdateCountersOnError) { DecRequest(); Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus, TDuration::Seconds(RequestTimer.Passed())); - } - return false; - } - - // Returns pointer to IQueueEvent to pass into grpc c runtime - // Implicit C style cast from this to void* is wrong due to multiple inheritance - void* GetGRpcTag() { - return static_cast<IQueueEvent*>(this); - } - - void OnFinish(EQueueEventStatus evStatus) { - if (this->Context.IsCancelled()) { - FinishPromise_.SetValue(EFinishStatus::CANCEL); - } else { - FinishPromise_.SetValue(evStatus == EQueueEventStatus::OK ? EFinishStatus::OK : EFinishStatus::ERROR); - } - } - + } + return false; + } + + // Returns pointer to IQueueEvent to pass into grpc c runtime + // Implicit C style cast from this to void* is wrong due to multiple inheritance + void* GetGRpcTag() { + return static_cast<IQueueEvent*>(this); + } + + void OnFinish(EQueueEventStatus evStatus) { + if (this->Context.IsCancelled()) { + FinishPromise_.SetValue(EFinishStatus::CANCEL); + } else { + FinishPromise_.SetValue(evStatus == EQueueEventStatus::OK ? EFinishStatus::OK : EFinishStatus::ERROR); + } + } + bool IncRequest() { if (!Server_->IncRequest()) return false; @@ -480,36 +480,36 @@ private: } using TStateFunc = bool (TThis::*)(bool); - TService* Server_; - TOnRequest Cb_; - TRequestCallback RequestCallback_; - TStreamRequestCallback StreamRequestCallback_; + TService* Server_; + TOnRequest Cb_; + TRequestCallback RequestCallback_; + TStreamRequestCallback StreamRequestCallback_; const char* const Name_; TLoggerPtr Logger_; ICounterBlockPtr Counters_; IGRpcRequestLimiterPtr RequestLimiter_; - + THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_; - THolder<grpc::ServerAsyncWriterInterface<TUniversalResponse<TOut>>> StreamWriter_; - TStateFunc StateFunc_; - TIn* Request_; - - google::protobuf::Arena Arena_; - TOnNextReply NextReplyCb_; - ui32 RequestSize = 0; - ui32 ResponseSize = 0; + THolder<grpc::ServerAsyncWriterInterface<TUniversalResponse<TOut>>> StreamWriter_; + TStateFunc StateFunc_; + TIn* Request_; + + google::protobuf::Arena Arena_; + TOnNextReply NextReplyCb_; + ui32 RequestSize = 0; + ui32 ResponseSize = 0; ui32 ResponseStatus = 0; THPTimer RequestTimer; - TAuthState AuthState_ = 0; + TAuthState AuthState_ = 0; bool RequestRegistered_ = false; - + using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>; TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish }; - NThreading::TPromise<EFinishStatus> FinishPromise_; - bool SkipUpdateCountersOnError = false; - IStreamAdaptor::TPtr StreamAdaptor_; -}; - + NThreading::TPromise<EFinishStatus> FinishPromise_; + bool SkipUpdateCountersOnError = false; + IStreamAdaptor::TPtr StreamAdaptor_; +}; + template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter=google::protobuf::TextFormat::Printer, typename TOutProtoPrinter=google::protobuf::TextFormat::Printer> class TGRpcRequest: public TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter> { using TBase = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>; diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h index 506221dd988..fcfce1c181a 100644 --- a/library/cpp/grpc/server/grpc_request_base.h +++ b/library/cpp/grpc/server/grpc_request_base.h @@ -1,33 +1,33 @@ -#pragma once - +#pragma once + #include <google/protobuf/message.h> #include <library/cpp/threading/future/future.h> - + #include <grpc++/server_context.h> -namespace grpc { -class ByteBuffer; -} - +namespace grpc { +class ByteBuffer; +} + namespace NGrpc { - -extern const char* GRPC_USER_AGENT_HEADER; - -struct TAuthState { - enum EAuthState { - AS_NOT_PERFORMED, - AS_OK, - AS_FAIL, - AS_UNAVAILABLE - }; - TAuthState(bool needAuth) - : NeedAuth(needAuth) - , State(AS_NOT_PERFORMED) - {} - bool NeedAuth; - EAuthState State; -}; - + +extern const char* GRPC_USER_AGENT_HEADER; + +struct TAuthState { + enum EAuthState { + AS_NOT_PERFORMED, + AS_OK, + AS_FAIL, + AS_UNAVAILABLE + }; + TAuthState(bool needAuth) + : NeedAuth(needAuth) + , State(AS_NOT_PERFORMED) + {} + bool NeedAuth; + EAuthState State; +}; + //! An interface that may be used to limit concurrency of requests class IGRpcRequestLimiter: public TThrRefBase { @@ -38,79 +38,79 @@ public: using IGRpcRequestLimiterPtr = TIntrusivePtr<IGRpcRequestLimiter>; -//! State of current request +//! State of current request class IRequestContextBase: public TThrRefBase { -public: - enum class EFinishStatus { - OK, - ERROR, - CANCEL - }; - using TAsyncFinishResult = NThreading::TFuture<EFinishStatus>; - - using TOnNextReply = std::function<void (size_t left)>; - - //! Get pointer to the request's message. - virtual const NProtoBuf::Message* GetRequest() const = 0; - - //! Get current auth state - virtual TAuthState& GetAuthState() = 0; - - //! Send common response (The request shoult be created for protobuf response type) - //! Implementation can swap protobuf message +public: + enum class EFinishStatus { + OK, + ERROR, + CANCEL + }; + using TAsyncFinishResult = NThreading::TFuture<EFinishStatus>; + + using TOnNextReply = std::function<void (size_t left)>; + + //! Get pointer to the request's message. + virtual const NProtoBuf::Message* GetRequest() const = 0; + + //! Get current auth state + virtual TAuthState& GetAuthState() = 0; + + //! Send common response (The request shoult be created for protobuf response type) + //! Implementation can swap protobuf message virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0) = 0; - + //! Send serialised response (The request shoult be created for bytes response type) - //! Implementation can swap ByteBuffer + //! Implementation can swap ByteBuffer virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0) = 0; - - //! Send grpc UNAUTHENTICATED status - virtual void ReplyUnauthenticated(const TString& in) = 0; - + + //! Send grpc UNAUTHENTICATED status + virtual void ReplyUnauthenticated(const TString& in) = 0; + //! Send grpc error virtual void ReplyError(grpc::StatusCode code, const TString& msg) = 0; - //! Returns deadline (server epoch related) if peer set it on its side, or Instanse::Max() otherwise - virtual TInstant Deadline() const = 0; + //! Returns deadline (server epoch related) if peer set it on its side, or Instanse::Max() otherwise + virtual TInstant Deadline() const = 0; //! Returns available peer metadata keys virtual TSet<TStringBuf> GetPeerMetaKeys() const = 0; - //! Returns peer optional metavalue + //! Returns peer optional metavalue virtual TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const = 0; - + //! Returns request compression level virtual grpc_compression_level GetCompressionLevel() const = 0; - //! Returns protobuf arena allocator associated with current request - //! Lifetime of the arena is lifetime of the context - virtual google::protobuf::Arena* GetArena() = 0; - - //! Add trailing metadata in to grpc context - //! The metadata will be send at the time of rpc finish - virtual void AddTrailingMetadata(const TString& key, const TString& value) = 0; - + //! Returns protobuf arena allocator associated with current request + //! Lifetime of the arena is lifetime of the context + virtual google::protobuf::Arena* GetArena() = 0; + + //! Add trailing metadata in to grpc context + //! The metadata will be send at the time of rpc finish + virtual void AddTrailingMetadata(const TString& key, const TString& value) = 0; + //! Use validated database name for counters virtual void UseDatabase(const TString& database) = 0; - // Streaming part - - //! Set callback. The callback will be called when response deliverid to the client - //! after that we can call Reply again in streaming mode. Yes, GRpc says there is only one - //! reply in flight - virtual void SetNextReplyCallback(TOnNextReply&& cb) = 0; - - //! Finish streaming reply - virtual void FinishStreamingOk() = 0; - - //! Returns future to get cancel of finish notification - virtual TAsyncFinishResult GetFinishFuture() = 0; + // Streaming part + + //! Set callback. The callback will be called when response deliverid to the client + //! after that we can call Reply again in streaming mode. Yes, GRpc says there is only one + //! reply in flight + virtual void SetNextReplyCallback(TOnNextReply&& cb) = 0; + + //! Finish streaming reply + virtual void FinishStreamingOk() = 0; + + //! Returns future to get cancel of finish notification + virtual TAsyncFinishResult GetFinishFuture() = 0; //! Returns peer address virtual TString GetPeer() const = 0; //! Returns true if server is using ssl virtual bool SslServer() const = 0; -}; - +}; + } // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_response.h b/library/cpp/grpc/server/grpc_response.h index 53a3195982d..8e9afe44d53 100644 --- a/library/cpp/grpc/server/grpc_response.h +++ b/library/cpp/grpc/server/grpc_response.h @@ -11,7 +11,7 @@ namespace NGrpc { * Universal response that owns underlying message or buffer. */ template <typename TMsg> -class TUniversalResponse: public TAtomicRefCount<TUniversalResponse<TMsg>>, public TMoveOnly { +class TUniversalResponse: public TAtomicRefCount<TUniversalResponse<TMsg>>, public TMoveOnly { friend class grpc::SerializationTraits<TUniversalResponse<TMsg>>; public: diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp index 3e68b26e1c2..7437b7a8f5e 100644 --- a/library/cpp/grpc/server/grpc_server.cpp +++ b/library/cpp/grpc/server/grpc_server.cpp @@ -1,12 +1,12 @@ -#include "grpc_server.h" - -#include <util/string/join.h> -#include <util/generic/yexception.h> -#include <util/system/thread.h> - -#include <grpc++/resource_quota.h> +#include "grpc_server.h" + +#include <util/string/join.h> +#include <util/generic/yexception.h> +#include <util/system/thread.h> + +#include <grpc++/resource_quota.h> #include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h> - + #if !defined(_WIN32) && !defined(_WIN64) #include <sys/socket.h> @@ -16,9 +16,9 @@ #endif namespace NGrpc { - -using NThreading::TFuture; - + +using NThreading::TFuture; + static void PullEvents(grpc::ServerCompletionQueue* cq) { TThread::SetCurrentThreadName("grpc_server"); while (true) { @@ -37,33 +37,33 @@ static void PullEvents(grpc::ServerCompletionQueue* cq) { } } -TGRpcServer::TGRpcServer(const TServerOptions& opts) - : Options_(opts) - , Limiter_(Options_.MaxGlobalRequestInFlight) - {} - -TGRpcServer::~TGRpcServer() { - Y_VERIFY(Ts.empty()); - Services_.clear(); -} - -void TGRpcServer::AddService(IGRpcServicePtr service) { - Services_.push_back(service); -} - -void TGRpcServer::Start() { +TGRpcServer::TGRpcServer(const TServerOptions& opts) + : Options_(opts) + , Limiter_(Options_.MaxGlobalRequestInFlight) + {} + +TGRpcServer::~TGRpcServer() { + Y_VERIFY(Ts.empty()); + Services_.clear(); +} + +void TGRpcServer::AddService(IGRpcServicePtr service) { + Services_.push_back(service); +} + +void TGRpcServer::Start() { TString server_address(Join(":", Options_.Host, Options_.Port)); // https://st.yandex-team.ru/DTCC-695 - using grpc::ServerBuilder; - using grpc::ResourceQuota; - ServerBuilder builder; + using grpc::ServerBuilder; + using grpc::ResourceQuota; + ServerBuilder builder; auto credentials = grpc::InsecureServerCredentials(); if (Options_.SslData) { - grpc::SslServerCredentialsOptions::PemKeyCertPair keycert; - keycert.cert_chain = std::move(Options_.SslData->Cert); - keycert.private_key = std::move(Options_.SslData->Key); - grpc::SslServerCredentialsOptions sslOps; - sslOps.pem_root_certs = std::move(Options_.SslData->Root); - sslOps.pem_key_cert_pairs.push_back(keycert); + grpc::SslServerCredentialsOptions::PemKeyCertPair keycert; + keycert.cert_chain = std::move(Options_.SslData->Cert); + keycert.private_key = std::move(Options_.SslData->Key); + grpc::SslServerCredentialsOptions sslOps; + sslOps.pem_root_certs = std::move(Options_.SslData->Root); + sslOps.pem_key_cert_pairs.push_back(keycert); credentials = grpc::SslServerCredentials(sslOps); } if (Options_.ExternalListener) { @@ -72,58 +72,58 @@ void TGRpcServer::Start() { credentials )); } else { - builder.AddListeningPort(server_address, credentials); - } - builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize); - builder.SetMaxSendMessageSize(Options_.MaxMessageSize); - for (IGRpcServicePtr service : Services_) { + builder.AddListeningPort(server_address, credentials); + } + builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize); + builder.SetMaxSendMessageSize(Options_.MaxMessageSize); + for (IGRpcServicePtr service : Services_) { service->SetServerOptions(Options_); - builder.RegisterService(service->GetService()); - service->SetGlobalLimiterHandle(&Limiter_); - } - + builder.RegisterService(service->GetService()); + service->SetGlobalLimiterHandle(&Limiter_); + } + class TKeepAliveOption: public grpc::ServerBuilderOption { - public: - TKeepAliveOption(int idle, int interval) - : Idle(idle) - , Interval(interval) - , KeepAliveEnabled(true) - {} - - TKeepAliveOption() - : Idle(0) - , Interval(0) - , KeepAliveEnabled(false) - {} - - void UpdateArguments(grpc::ChannelArguments *args) override { - args->SetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES, 0); - args->SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 1000); - if (KeepAliveEnabled) { - args->SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0); - args->SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); - args->SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, Idle * 1000); - args->SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, Idle * 1000); - args->SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, Interval * 1000); - } - } - - void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override - {} - private: - const int Idle; - const int Interval; - const bool KeepAliveEnabled; - }; - - if (Options_.KeepAliveEnable) { - builder.SetOption(std::make_unique<TKeepAliveOption>( - Options_.KeepAliveIdleTimeoutTriggerSec, - Options_.KeepAliveProbeIntervalSec)); - } else { - builder.SetOption(std::make_unique<TKeepAliveOption>()); - } - + public: + TKeepAliveOption(int idle, int interval) + : Idle(idle) + , Interval(interval) + , KeepAliveEnabled(true) + {} + + TKeepAliveOption() + : Idle(0) + , Interval(0) + , KeepAliveEnabled(false) + {} + + void UpdateArguments(grpc::ChannelArguments *args) override { + args->SetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES, 0); + args->SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 1000); + if (KeepAliveEnabled) { + args->SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0); + args->SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); + args->SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, Idle * 1000); + args->SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, Idle * 1000); + args->SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, Interval * 1000); + } + } + + void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override + {} + private: + const int Idle; + const int Interval; + const bool KeepAliveEnabled; + }; + + if (Options_.KeepAliveEnable) { + builder.SetOption(std::make_unique<TKeepAliveOption>( + Options_.KeepAliveIdleTimeoutTriggerSec, + Options_.KeepAliveProbeIntervalSec)); + } else { + builder.SetOption(std::make_unique<TKeepAliveOption>()); + } + if (Options_.UseCompletionQueuePerThread) { for (size_t i = 0; i < Options_.WorkerThreads; ++i) { CQS_.push_back(builder.AddCompletionQueue()); @@ -132,30 +132,30 @@ void TGRpcServer::Start() { CQS_.push_back(builder.AddCompletionQueue()); } - if (Options_.GRpcMemoryQuotaBytes) { + if (Options_.GRpcMemoryQuotaBytes) { // See details KIKIMR-6932 - /* - grpc::ResourceQuota quota("memory_bound"); - quota.Resize(Options_.GRpcMemoryQuotaBytes); - - builder.SetResourceQuota(quota); - */ + /* + grpc::ResourceQuota quota("memory_bound"); + quota.Resize(Options_.GRpcMemoryQuotaBytes); + + builder.SetResourceQuota(quota); + */ Cerr << "GRpc memory quota temporarily disabled due to issues with grpc quoter" << Endl; - } + } Options_.ServerBuilderMutator(builder); builder.SetDefaultCompressionLevel(Options_.DefaultCompressionLevel); - - Server_ = builder.BuildAndStart(); - if (!Server_) { + + Server_ = builder.BuildAndStart(); + if (!Server_) { ythrow yexception() << "can't start grpc server on " << server_address; - } + } size_t index = 0; - for (IGRpcServicePtr service : Services_) { + for (IGRpcServicePtr service : Services_) { // TODO: provide something else for services instead of ServerCompletionQueue service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger); - } - + } + if (Options_.UseCompletionQueuePerThread) { for (size_t i = 0; i < Options_.WorkerThreads; ++i) { auto* cq = &CQS_[i]; @@ -170,71 +170,71 @@ void TGRpcServer::Start() { PullEvents(cq->get()); })); } - } + } if (Options_.ExternalListener) { Options_.ExternalListener->Start(); } -} - -void TGRpcServer::Stop() { - for (auto& service : Services_) { - service->StopService(); - } - - auto now = TInstant::Now(); - - if (Server_) { - i64 sec = Options_.GRpcShutdownDeadline.Seconds(); - Y_VERIFY(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>()); - i32 nanosecOfSec = Options_.GRpcShutdownDeadline.NanoSecondsOfSecond(); - Server_->Shutdown(gpr_timespec{sec, nanosecOfSec, GPR_TIMESPAN}); - } - +} + +void TGRpcServer::Stop() { + for (auto& service : Services_) { + service->StopService(); + } + + auto now = TInstant::Now(); + + if (Server_) { + i64 sec = Options_.GRpcShutdownDeadline.Seconds(); + Y_VERIFY(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>()); + i32 nanosecOfSec = Options_.GRpcShutdownDeadline.NanoSecondsOfSecond(); + Server_->Shutdown(gpr_timespec{sec, nanosecOfSec, GPR_TIMESPAN}); + } + for (ui64 attempt = 0; ; ++attempt) { bool unsafe = false; - size_t infly = 0; - for (auto& service : Services_) { + size_t infly = 0; + for (auto& service : Services_) { unsafe |= service->IsUnsafeToShutdown(); infly += service->RequestsInProgress(); - } - + } + if (!unsafe && !infly) - break; + break; - auto spent = (TInstant::Now() - now).SecondsFloat(); + auto spent = (TInstant::Now() - now).SecondsFloat(); if (attempt % 300 == 0) { // don't log too much Cerr << "GRpc shutdown warning: left infly: " << infly << ", spent: " << spent << " sec" << Endl; } if (!unsafe && spent > Options_.GRpcShutdownDeadline.SecondsFloat()) - break; - Sleep(TDuration::MilliSeconds(10)); - } - - // Always shutdown the completion queue after the server. + break; + Sleep(TDuration::MilliSeconds(10)); + } + + // Always shutdown the completion queue after the server. for (auto& cq : CQS_) { cq->Shutdown(); - } - - for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) { - (*ti)->Join(); - } - - Ts.clear(); + } + + for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) { + (*ti)->Join(); + } + + Ts.clear(); if (Options_.ExternalListener) { Options_.ExternalListener->Stop(); } -} - -ui16 TGRpcServer::GetPort() const { - return Options_.Port; -} - -TString TGRpcServer::GetHost() const { - return Options_.Host; -} - +} + +ui16 TGRpcServer::GetPort() const { + return Options_.Port; +} + +TString TGRpcServer::GetHost() const { + return Options_.Host; +} + } // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h index 0a4123d84e1..d6814a90a0d 100644 --- a/library/cpp/grpc/server/grpc_server.h +++ b/library/cpp/grpc/server/grpc_server.h @@ -1,32 +1,32 @@ -#pragma once - +#pragma once + #include "grpc_request_base.h" #include "logger.h" #include <library/cpp/threading/future/future.h> - -#include <util/generic/ptr.h> -#include <util/generic/string.h> -#include <util/generic/vector.h> + +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/generic/vector.h> #include <util/generic/maybe.h> -#include <util/generic/queue.h> -#include <util/generic/hash_set.h> -#include <util/system/types.h> -#include <util/system/mutex.h> +#include <util/generic/queue.h> +#include <util/generic/hash_set.h> +#include <util/system/types.h> +#include <util/system/mutex.h> #include <util/thread/factory.h> - -#include <grpc++/grpc++.h> - + +#include <grpc++/grpc++.h> + namespace NGrpc { - -constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000; - -struct TSslData { - TString Cert; - TString Key; - TString Root; -}; - + +constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000; + +struct TSslData { + TString Cert; + TString Key; + TString Root; +}; + struct IExternalListener : public TThrRefBase { @@ -36,55 +36,55 @@ struct IExternalListener virtual void Stop() = 0; }; -//! Server's options. -struct TServerOptions { -#define DECLARE_FIELD(name, type, default) \ - type name{default}; \ - inline TServerOptions& Set##name(const type& value) { \ - name = value; \ - return *this; \ - } - - //! Hostname of server to bind to. - DECLARE_FIELD(Host, TString, "[::]"); - //! Service port. - DECLARE_FIELD(Port, ui16, 0); - - //! Number of worker threads. - DECLARE_FIELD(WorkerThreads, size_t, 2); - +//! Server's options. +struct TServerOptions { +#define DECLARE_FIELD(name, type, default) \ + type name{default}; \ + inline TServerOptions& Set##name(const type& value) { \ + name = value; \ + return *this; \ + } + + //! Hostname of server to bind to. + DECLARE_FIELD(Host, TString, "[::]"); + //! Service port. + DECLARE_FIELD(Port, ui16, 0); + + //! Number of worker threads. + DECLARE_FIELD(WorkerThreads, size_t, 2); + //! Create one completion queue per thread DECLARE_FIELD(UseCompletionQueuePerThread, bool, false); - //! Memory quota size for grpc server in bytes. Zero means unlimited. - DECLARE_FIELD(GRpcMemoryQuotaBytes, size_t, 0); - - //! How long to wait until pending rpcs are forcefully terminated. - DECLARE_FIELD(GRpcShutdownDeadline, TDuration, TDuration::Seconds(30)); - - //! In/Out message size limit - DECLARE_FIELD(MaxMessageSize, size_t, DEFAULT_GRPC_MESSAGE_SIZE_LIMIT); - - //! Use GRpc keepalive - DECLARE_FIELD(KeepAliveEnable, TMaybe<bool>, TMaybe<bool>()); - - //! GRPC_ARG_KEEPALIVE_TIME_MS setting + //! Memory quota size for grpc server in bytes. Zero means unlimited. + DECLARE_FIELD(GRpcMemoryQuotaBytes, size_t, 0); + + //! How long to wait until pending rpcs are forcefully terminated. + DECLARE_FIELD(GRpcShutdownDeadline, TDuration, TDuration::Seconds(30)); + + //! In/Out message size limit + DECLARE_FIELD(MaxMessageSize, size_t, DEFAULT_GRPC_MESSAGE_SIZE_LIMIT); + + //! Use GRpc keepalive + DECLARE_FIELD(KeepAliveEnable, TMaybe<bool>, TMaybe<bool>()); + + //! GRPC_ARG_KEEPALIVE_TIME_MS setting DECLARE_FIELD(KeepAliveIdleTimeoutTriggerSec, int, 0); - //! Deprecated, ths option ignored. Will be removed soon. + //! Deprecated, ths option ignored. Will be removed soon. DECLARE_FIELD(KeepAliveMaxProbeCount, int, 0); - //! GRPC_ARG_KEEPALIVE_TIMEOUT_MS setting + //! GRPC_ARG_KEEPALIVE_TIMEOUT_MS setting DECLARE_FIELD(KeepAliveProbeIntervalSec, int, 0); - //! Max number of requests processing by services (global limit for grpc server) - DECLARE_FIELD(MaxGlobalRequestInFlight, size_t, 100000); - - //! SSL server data - DECLARE_FIELD(SslData, TMaybe<TSslData>, TMaybe<TSslData>()); - - //! GRPC auth - DECLARE_FIELD(UseAuth, bool, false); + //! Max number of requests processing by services (global limit for grpc server) + DECLARE_FIELD(MaxGlobalRequestInFlight, size_t, 100000); + + //! SSL server data + DECLARE_FIELD(SslData, TMaybe<TSslData>, TMaybe<TSslData>()); + + //! GRPC auth + DECLARE_FIELD(UseAuth, bool, false); //! Default compression level. Used when no compression options provided by client. // Mapping to particular compression algorithm depends on client. @@ -98,75 +98,75 @@ struct TServerOptions { //! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled). DECLARE_FIELD(Logger, TLoggerPtr, nullptr); -#undef DECLARE_FIELD -}; - -class IQueueEvent { -public: - virtual ~IQueueEvent() = default; - +#undef DECLARE_FIELD +}; + +class IQueueEvent { +public: + virtual ~IQueueEvent() = default; + //! Execute an action defined by implementation. - virtual bool Execute(bool ok) = 0; - - //! It is time to perform action requested by AcquireToken server method. It will be called under lock which is also - // used in ReturnToken/AcquireToken methods. Default implementation does nothing assuming that request processor does - // not implement in flight management. - virtual void Process() {} - - //! Finish and destroy request. - virtual void DestroyRequest() = 0; -}; - -class ICancelableContext { -public: - virtual void Shutdown() = 0; - virtual ~ICancelableContext() = default; -}; - + virtual bool Execute(bool ok) = 0; + + //! It is time to perform action requested by AcquireToken server method. It will be called under lock which is also + // used in ReturnToken/AcquireToken methods. Default implementation does nothing assuming that request processor does + // not implement in flight management. + virtual void Process() {} + + //! Finish and destroy request. + virtual void DestroyRequest() = 0; +}; + +class ICancelableContext { +public: + virtual void Shutdown() = 0; + virtual ~ICancelableContext() = default; +}; + template <class TLimit> class TInFlightLimiterImpl { -public: +public: explicit TInFlightLimiterImpl(const TLimit& limit) - : Limit_(limit) - {} - - bool Inc() { - i64 newVal; - i64 prev; - do { - prev = AtomicGet(CurInFlightReqs_); - Y_VERIFY(prev >= 0); - if (Limit_ && prev > Limit_) { - return false; - } - newVal = prev + 1; - } while (!AtomicCas(&CurInFlightReqs_, newVal, prev)); - return true; - } - - void Dec() { - i64 newVal = AtomicDecrement(CurInFlightReqs_); - Y_VERIFY(newVal >= 0); - } - - i64 GetCurrentInFlight() const { - return AtomicGet(CurInFlightReqs_); - } - -private: + : Limit_(limit) + {} + + bool Inc() { + i64 newVal; + i64 prev; + do { + prev = AtomicGet(CurInFlightReqs_); + Y_VERIFY(prev >= 0); + if (Limit_ && prev > Limit_) { + return false; + } + newVal = prev + 1; + } while (!AtomicCas(&CurInFlightReqs_, newVal, prev)); + return true; + } + + void Dec() { + i64 newVal = AtomicDecrement(CurInFlightReqs_); + Y_VERIFY(newVal >= 0); + } + + i64 GetCurrentInFlight() const { + return AtomicGet(CurInFlightReqs_); + } + +private: const TLimit Limit_; - TAtomic CurInFlightReqs_ = 0; -}; - + TAtomic CurInFlightReqs_ = 0; +}; + using TGlobalLimiter = TInFlightLimiterImpl<i64>; class IGRpcService: public TThrRefBase { -public: - virtual grpc::Service* GetService() = 0; - virtual void StopService() noexcept = 0; +public: + virtual grpc::Service* GetService() = 0; + virtual void StopService() noexcept = 0; virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0; - virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0; + virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0; virtual bool IsUnsafeToShutdown() const = 0; virtual size_t RequestsInProgress() const = 0; @@ -175,11 +175,11 @@ public: * service to inspect server options and initialize accordingly. */ virtual void SetServerOptions(const TServerOptions& options) = 0; -}; - -template<typename T> +}; + +template<typename T> class TGrpcServiceBase: public IGRpcService { -public: +public: class TShutdownGuard { using TOwner = TGrpcServiceBase<T>; friend class TGrpcServiceBase<T>; @@ -232,20 +232,20 @@ public: }; public: - using TCurrentGRpcService = T; - - void StopService() noexcept override { - with_lock(Lock_) { + using TCurrentGRpcService = T; + + void StopService() noexcept override { + with_lock(Lock_) { AtomicSet(ShuttingDown_, 1); - // Send TryCansel to event (can be send after finishing). - // Actual dtors will be called from grpc thread, so deadlock impossible - for (auto* request : Requests_) { - request->Shutdown(); - } - } - } - + // Send TryCansel to event (can be send after finishing). + // Actual dtors will be called from grpc thread, so deadlock impossible + for (auto* request : Requests_) { + request->Shutdown(); + } + } + } + TShutdownGuard ProtectShutdown() noexcept { AtomicIncrement(GuardCount_); if (IsShuttingDown()) { @@ -261,35 +261,35 @@ public: } size_t RequestsInProgress() const override { - size_t c = 0; - with_lock(Lock_) { - c = Requests_.size(); - } - return c; - } - + size_t c = 0; + with_lock(Lock_) { + c = Requests_.size(); + } + return c; + } + void SetServerOptions(const TServerOptions& options) override { SslServer_ = bool(options.SslData); NeedAuth_ = options.UseAuth; - } - - void SetGlobalLimiterHandle(TGlobalLimiter* /*limiter*/) override {} - - //! Check if the server is going to shut down. - bool IsShuttingDown() const { - return AtomicGet(ShuttingDown_); - } - + } + + void SetGlobalLimiterHandle(TGlobalLimiter* /*limiter*/) override {} + + //! Check if the server is going to shut down. + bool IsShuttingDown() const { + return AtomicGet(ShuttingDown_); + } + bool SslServer() const { return SslServer_; } - bool NeedAuth() const { - return NeedAuth_; - } - + bool NeedAuth() const { + return NeedAuth_; + } + bool RegisterRequestCtx(ICancelableContext* req) { - with_lock(Lock_) { + with_lock(Lock_) { auto r = Requests_.emplace(req); Y_VERIFY(r.second, "Ctx already registered"); @@ -298,59 +298,59 @@ public: Requests_.erase(r.first); return false; } - } + } return true; - } - - void DeregisterRequestCtx(ICancelableContext* req) { - with_lock(Lock_) { + } + + void DeregisterRequestCtx(ICancelableContext* req) { + with_lock(Lock_) { Y_VERIFY(Requests_.erase(req), "Ctx is not registered"); - } - } - -protected: - using TGrpcAsyncService = typename TCurrentGRpcService::AsyncService; - TGrpcAsyncService Service_; - + } + } + +protected: + using TGrpcAsyncService = typename TCurrentGRpcService::AsyncService; + TGrpcAsyncService Service_; + TGrpcAsyncService* GetService() override { - return &Service_; - } - -private: - TAtomic ShuttingDown_ = 0; + return &Service_; + } + +private: + TAtomic ShuttingDown_ = 0; TAtomic GuardCount_ = 0; - + bool SslServer_ = false; - bool NeedAuth_ = false; - - THashSet<ICancelableContext*> Requests_; - TAdaptiveLock Lock_; -}; - + bool NeedAuth_ = false; + + THashSet<ICancelableContext*> Requests_; + TAdaptiveLock Lock_; +}; + class TGRpcServer { -public: - using IGRpcServicePtr = TIntrusivePtr<IGRpcService>; - TGRpcServer(const TServerOptions& opts); - ~TGRpcServer(); - void AddService(IGRpcServicePtr service); - void Start(); - // Send stop to registred services and call Shutdown on grpc server - // This method MUST be called before destroying TGRpcServer - void Stop(); - ui16 GetPort() const; - TString GetHost() const; - -private: +public: + using IGRpcServicePtr = TIntrusivePtr<IGRpcService>; + TGRpcServer(const TServerOptions& opts); + ~TGRpcServer(); + void AddService(IGRpcServicePtr service); + void Start(); + // Send stop to registred services and call Shutdown on grpc server + // This method MUST be called before destroying TGRpcServer + void Stop(); + ui16 GetPort() const; + TString GetHost() const; + +private: using IThreadRef = TAutoPtr<IThreadFactory::IThread>; - - const TServerOptions Options_; - std::unique_ptr<grpc::Server> Server_; + + const TServerOptions Options_; + std::unique_ptr<grpc::Server> Server_; std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_; TVector<IThreadRef> Ts; - + TVector<IGRpcServicePtr> Services_; - TGlobalLimiter Limiter_; -}; - + TGlobalLimiter Limiter_; +}; + } // namespace NGrpc diff --git a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp index 0840c77176d..c34d3b8c2bf 100644 --- a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp +++ b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp @@ -1,121 +1,121 @@ #include <library/cpp/grpc/server/grpc_request.h> #include <library/cpp/testing/unittest/registar.h> #include <library/cpp/testing/unittest/tests_data.h> - -#include <util/system/thread.h> -#include <util/thread/pool.h> - + +#include <util/system/thread.h> +#include <util/thread/pool.h> + using namespace NGrpc; - -// Here we emulate stream data producer + +// Here we emulate stream data producer class TOrderedProducer: public TThread { -public: - TOrderedProducer(IStreamAdaptor* adaptor, ui64 max, bool withSleep, std::function<void(ui64)>&& consumerOp) - : TThread(&ThreadProc, this) - , Adaptor_(adaptor) - , Max_(max) - , WithSleep_(withSleep) - , ConsumerOp_(std::move(consumerOp)) - {} - - static void* ThreadProc(void* _this) { - SetCurrentThreadName("OrderedProducerThread"); - static_cast<TOrderedProducer*>(_this)->Exec(); - return nullptr; - } - - void Exec() { - for (ui64 i = 0; i < Max_; i++) { - auto cb = [i, this]() mutable { - ConsumerOp_(i); - }; - Adaptor_->Enqueue(std::move(cb), false); - if (WithSleep_ && (i % 256 == 0)) { - Sleep(TDuration::MilliSeconds(10)); - } - } - } - -private: - IStreamAdaptor* Adaptor_; - const ui64 Max_; - const bool WithSleep_; - std::function<void(ui64)> ConsumerOp_; -}; - -Y_UNIT_TEST_SUITE(StreamAdaptor) { - static void OrderingTest(size_t threads, bool withSleep) { - - auto adaptor = CreateStreamAdaptor(); - - const i64 max = 10000; - - // Here we will emulate grpc stream (NextReply call after writing) +public: + TOrderedProducer(IStreamAdaptor* adaptor, ui64 max, bool withSleep, std::function<void(ui64)>&& consumerOp) + : TThread(&ThreadProc, this) + , Adaptor_(adaptor) + , Max_(max) + , WithSleep_(withSleep) + , ConsumerOp_(std::move(consumerOp)) + {} + + static void* ThreadProc(void* _this) { + SetCurrentThreadName("OrderedProducerThread"); + static_cast<TOrderedProducer*>(_this)->Exec(); + return nullptr; + } + + void Exec() { + for (ui64 i = 0; i < Max_; i++) { + auto cb = [i, this]() mutable { + ConsumerOp_(i); + }; + Adaptor_->Enqueue(std::move(cb), false); + if (WithSleep_ && (i % 256 == 0)) { + Sleep(TDuration::MilliSeconds(10)); + } + } + } + +private: + IStreamAdaptor* Adaptor_; + const ui64 Max_; + const bool WithSleep_; + std::function<void(ui64)> ConsumerOp_; +}; + +Y_UNIT_TEST_SUITE(StreamAdaptor) { + static void OrderingTest(size_t threads, bool withSleep) { + + auto adaptor = CreateStreamAdaptor(); + + const i64 max = 10000; + + // Here we will emulate grpc stream (NextReply call after writing) std::unique_ptr<IThreadPool> consumerQueue(new TThreadPool(TThreadPool::TParams().SetBlocking(false).SetCatching(false))); - // And make sure only one request inflight (see UNIT_ASSERT on adding to the queue) - consumerQueue->Start(threads, 1); - - // Non atomic!!! Stream adaptor must protect us - ui64 curVal = 0; - - // Used just to wait in the main thread - TAtomic finished = false; - auto consumerOp = [&finished, &curVal, ptr{adaptor.get()}, queue{consumerQueue.get()}](ui64 i) { - // Check no reordering inside stream adaptor - // and no simultanious consumer Op call - UNIT_ASSERT_VALUES_EQUAL(curVal, i); - curVal++; - // We must set finished flag after last ProcessNext, but we can`t compare curVal and max after ProcessNext - // so compare here and set after - bool tmp = curVal == max; - bool res = queue->AddFunc([ptr, &finished, tmp, &curVal, i]() { - // Additional check the value still same - // run under tsan makes sure no consumer Op call before we call ProcessNext - UNIT_ASSERT_VALUES_EQUAL(curVal, i + 1); - ptr->ProcessNext(); - // Reordering after ProcessNext is possible, so check tmp and set finished to true - if (tmp) - AtomicSet(finished, true); - }); - UNIT_ASSERT(res); - }; - - TOrderedProducer producer(adaptor.get(), max, withSleep, std::move(consumerOp)); - - producer.Start(); - producer.Join(); - - while (!AtomicGet(finished)) - { - Sleep(TDuration::MilliSeconds(100)); - } - - consumerQueue->Stop(); - - UNIT_ASSERT_VALUES_EQUAL(curVal, max); - } - - Y_UNIT_TEST(OrderingOneThread) { - OrderingTest(1, false); - } - - Y_UNIT_TEST(OrderingTwoThreads) { - OrderingTest(2, false); - } - - Y_UNIT_TEST(OrderingManyThreads) { - OrderingTest(10, false); - } - - Y_UNIT_TEST(OrderingOneThreadWithSleep) { - OrderingTest(1, true); - } - - Y_UNIT_TEST(OrderingTwoThreadsWithSleep) { - OrderingTest(2, true); - } - - Y_UNIT_TEST(OrderingManyThreadsWithSleep) { - OrderingTest(10, true); - } -} + // And make sure only one request inflight (see UNIT_ASSERT on adding to the queue) + consumerQueue->Start(threads, 1); + + // Non atomic!!! Stream adaptor must protect us + ui64 curVal = 0; + + // Used just to wait in the main thread + TAtomic finished = false; + auto consumerOp = [&finished, &curVal, ptr{adaptor.get()}, queue{consumerQueue.get()}](ui64 i) { + // Check no reordering inside stream adaptor + // and no simultanious consumer Op call + UNIT_ASSERT_VALUES_EQUAL(curVal, i); + curVal++; + // We must set finished flag after last ProcessNext, but we can`t compare curVal and max after ProcessNext + // so compare here and set after + bool tmp = curVal == max; + bool res = queue->AddFunc([ptr, &finished, tmp, &curVal, i]() { + // Additional check the value still same + // run under tsan makes sure no consumer Op call before we call ProcessNext + UNIT_ASSERT_VALUES_EQUAL(curVal, i + 1); + ptr->ProcessNext(); + // Reordering after ProcessNext is possible, so check tmp and set finished to true + if (tmp) + AtomicSet(finished, true); + }); + UNIT_ASSERT(res); + }; + + TOrderedProducer producer(adaptor.get(), max, withSleep, std::move(consumerOp)); + + producer.Start(); + producer.Join(); + + while (!AtomicGet(finished)) + { + Sleep(TDuration::MilliSeconds(100)); + } + + consumerQueue->Stop(); + + UNIT_ASSERT_VALUES_EQUAL(curVal, max); + } + + Y_UNIT_TEST(OrderingOneThread) { + OrderingTest(1, false); + } + + Y_UNIT_TEST(OrderingTwoThreads) { + OrderingTest(2, false); + } + + Y_UNIT_TEST(OrderingManyThreads) { + OrderingTest(10, false); + } + + Y_UNIT_TEST(OrderingOneThreadWithSleep) { + OrderingTest(1, true); + } + + Y_UNIT_TEST(OrderingTwoThreadsWithSleep) { + OrderingTest(2, true); + } + + Y_UNIT_TEST(OrderingManyThreadsWithSleep) { + OrderingTest(10, true); + } +} diff --git a/library/cpp/grpc/server/ut/ya.make b/library/cpp/grpc/server/ut/ya.make index 2f7fa1afc2d..feb3291af92 100644 --- a/library/cpp/grpc/server/ut/ya.make +++ b/library/cpp/grpc/server/ut/ya.make @@ -1,21 +1,21 @@ UNITTEST_FOR(library/cpp/grpc/server) - -OWNER( - dcherednik - g:kikimr -) - + +OWNER( + dcherednik + g:kikimr +) + TIMEOUT(600) SIZE(MEDIUM) - -PEERDIR( + +PEERDIR( library/cpp/grpc/server -) - -SRCS( +) + +SRCS( grpc_response_ut.cpp - stream_adaptor_ut.cpp -) - -END() - + stream_adaptor_ut.cpp +) + +END() + diff --git a/library/cpp/grpc/server/ya.make b/library/cpp/grpc/server/ya.make index 75499b926a0..356a1b6793d 100644 --- a/library/cpp/grpc/server/ya.make +++ b/library/cpp/grpc/server/ya.make @@ -1,25 +1,25 @@ -LIBRARY() - -OWNER( - dcherednik - g:kikimr -) - -SRCS( - event_callback.cpp - grpc_request.cpp - grpc_server.cpp - grpc_counters.cpp -) - -GENERATE_ENUM_SERIALIZATION(grpc_request_base.h) - -PEERDIR( - contrib/libs/grpc +LIBRARY() + +OWNER( + dcherednik + g:kikimr +) + +SRCS( + event_callback.cpp + grpc_request.cpp + grpc_server.cpp + grpc_counters.cpp +) + +GENERATE_ENUM_SERIALIZATION(grpc_request_base.h) + +PEERDIR( + contrib/libs/grpc library/cpp/monlib/dynamic_counters/percentile -) - -END() +) + +END() RECURSE_FOR_TESTS(ut) |