diff options
author | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:16 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:16 +0300 |
commit | 17e20fa084178ddcb16255f974dbde74fb93608b (patch) | |
tree | 39605336c0b4d33928df69a256102c515fdf6ff5 /library/cpp/grpc/server/grpc_server.h | |
parent | 97df5ca7413550bf233fc6c7210e292fca0a51af (diff) | |
download | ydb-17e20fa084178ddcb16255f974dbde74fb93608b.tar.gz |
Restoring authorship annotation for Daniil Cherednik <dcherednik@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/grpc/server/grpc_server.h')
-rw-r--r-- | library/cpp/grpc/server/grpc_server.h | 404 |
1 files changed, 202 insertions, 202 deletions
diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h index d6814a90a0..0a4123d84e 100644 --- a/library/cpp/grpc/server/grpc_server.h +++ b/library/cpp/grpc/server/grpc_server.h @@ -1,32 +1,32 @@ -#pragma once - +#pragma once + #include "grpc_request_base.h" #include "logger.h" #include <library/cpp/threading/future/future.h> - -#include <util/generic/ptr.h> -#include <util/generic/string.h> -#include <util/generic/vector.h> + +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/generic/vector.h> #include <util/generic/maybe.h> -#include <util/generic/queue.h> -#include <util/generic/hash_set.h> -#include <util/system/types.h> -#include <util/system/mutex.h> +#include <util/generic/queue.h> +#include <util/generic/hash_set.h> +#include <util/system/types.h> +#include <util/system/mutex.h> #include <util/thread/factory.h> - -#include <grpc++/grpc++.h> - + +#include <grpc++/grpc++.h> + namespace NGrpc { - -constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000; - -struct TSslData { - TString Cert; - TString Key; - TString Root; -}; - + +constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000; + +struct TSslData { + TString Cert; + TString Key; + TString Root; +}; + struct IExternalListener : public TThrRefBase { @@ -36,55 +36,55 @@ struct IExternalListener virtual void Stop() = 0; }; -//! Server's options. -struct TServerOptions { -#define DECLARE_FIELD(name, type, default) \ - type name{default}; \ - inline TServerOptions& Set##name(const type& value) { \ - name = value; \ - return *this; \ - } - - //! Hostname of server to bind to. - DECLARE_FIELD(Host, TString, "[::]"); - //! Service port. - DECLARE_FIELD(Port, ui16, 0); - - //! Number of worker threads. - DECLARE_FIELD(WorkerThreads, size_t, 2); - +//! Server's options. +struct TServerOptions { +#define DECLARE_FIELD(name, type, default) \ + type name{default}; \ + inline TServerOptions& Set##name(const type& value) { \ + name = value; \ + return *this; \ + } + + //! Hostname of server to bind to. + DECLARE_FIELD(Host, TString, "[::]"); + //! Service port. + DECLARE_FIELD(Port, ui16, 0); + + //! Number of worker threads. + DECLARE_FIELD(WorkerThreads, size_t, 2); + //! Create one completion queue per thread DECLARE_FIELD(UseCompletionQueuePerThread, bool, false); - //! Memory quota size for grpc server in bytes. Zero means unlimited. - DECLARE_FIELD(GRpcMemoryQuotaBytes, size_t, 0); - - //! How long to wait until pending rpcs are forcefully terminated. - DECLARE_FIELD(GRpcShutdownDeadline, TDuration, TDuration::Seconds(30)); - - //! In/Out message size limit - DECLARE_FIELD(MaxMessageSize, size_t, DEFAULT_GRPC_MESSAGE_SIZE_LIMIT); - - //! Use GRpc keepalive - DECLARE_FIELD(KeepAliveEnable, TMaybe<bool>, TMaybe<bool>()); - - //! GRPC_ARG_KEEPALIVE_TIME_MS setting + //! Memory quota size for grpc server in bytes. Zero means unlimited. + DECLARE_FIELD(GRpcMemoryQuotaBytes, size_t, 0); + + //! How long to wait until pending rpcs are forcefully terminated. + DECLARE_FIELD(GRpcShutdownDeadline, TDuration, TDuration::Seconds(30)); + + //! In/Out message size limit + DECLARE_FIELD(MaxMessageSize, size_t, DEFAULT_GRPC_MESSAGE_SIZE_LIMIT); + + //! Use GRpc keepalive + DECLARE_FIELD(KeepAliveEnable, TMaybe<bool>, TMaybe<bool>()); + + //! GRPC_ARG_KEEPALIVE_TIME_MS setting DECLARE_FIELD(KeepAliveIdleTimeoutTriggerSec, int, 0); - //! Deprecated, ths option ignored. Will be removed soon. + //! Deprecated, ths option ignored. Will be removed soon. DECLARE_FIELD(KeepAliveMaxProbeCount, int, 0); - //! GRPC_ARG_KEEPALIVE_TIMEOUT_MS setting + //! GRPC_ARG_KEEPALIVE_TIMEOUT_MS setting DECLARE_FIELD(KeepAliveProbeIntervalSec, int, 0); - //! Max number of requests processing by services (global limit for grpc server) - DECLARE_FIELD(MaxGlobalRequestInFlight, size_t, 100000); - - //! SSL server data - DECLARE_FIELD(SslData, TMaybe<TSslData>, TMaybe<TSslData>()); - - //! GRPC auth - DECLARE_FIELD(UseAuth, bool, false); + //! Max number of requests processing by services (global limit for grpc server) + DECLARE_FIELD(MaxGlobalRequestInFlight, size_t, 100000); + + //! SSL server data + DECLARE_FIELD(SslData, TMaybe<TSslData>, TMaybe<TSslData>()); + + //! GRPC auth + DECLARE_FIELD(UseAuth, bool, false); //! Default compression level. Used when no compression options provided by client. // Mapping to particular compression algorithm depends on client. @@ -98,75 +98,75 @@ struct TServerOptions { //! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled). DECLARE_FIELD(Logger, TLoggerPtr, nullptr); -#undef DECLARE_FIELD -}; - -class IQueueEvent { -public: - virtual ~IQueueEvent() = default; - +#undef DECLARE_FIELD +}; + +class IQueueEvent { +public: + virtual ~IQueueEvent() = default; + //! Execute an action defined by implementation. - virtual bool Execute(bool ok) = 0; - - //! It is time to perform action requested by AcquireToken server method. It will be called under lock which is also - // used in ReturnToken/AcquireToken methods. Default implementation does nothing assuming that request processor does - // not implement in flight management. - virtual void Process() {} - - //! Finish and destroy request. - virtual void DestroyRequest() = 0; -}; - -class ICancelableContext { -public: - virtual void Shutdown() = 0; - virtual ~ICancelableContext() = default; -}; - + virtual bool Execute(bool ok) = 0; + + //! It is time to perform action requested by AcquireToken server method. It will be called under lock which is also + // used in ReturnToken/AcquireToken methods. Default implementation does nothing assuming that request processor does + // not implement in flight management. + virtual void Process() {} + + //! Finish and destroy request. + virtual void DestroyRequest() = 0; +}; + +class ICancelableContext { +public: + virtual void Shutdown() = 0; + virtual ~ICancelableContext() = default; +}; + template <class TLimit> class TInFlightLimiterImpl { -public: +public: explicit TInFlightLimiterImpl(const TLimit& limit) - : Limit_(limit) - {} - - bool Inc() { - i64 newVal; - i64 prev; - do { - prev = AtomicGet(CurInFlightReqs_); - Y_VERIFY(prev >= 0); - if (Limit_ && prev > Limit_) { - return false; - } - newVal = prev + 1; - } while (!AtomicCas(&CurInFlightReqs_, newVal, prev)); - return true; - } - - void Dec() { - i64 newVal = AtomicDecrement(CurInFlightReqs_); - Y_VERIFY(newVal >= 0); - } - - i64 GetCurrentInFlight() const { - return AtomicGet(CurInFlightReqs_); - } - -private: + : Limit_(limit) + {} + + bool Inc() { + i64 newVal; + i64 prev; + do { + prev = AtomicGet(CurInFlightReqs_); + Y_VERIFY(prev >= 0); + if (Limit_ && prev > Limit_) { + return false; + } + newVal = prev + 1; + } while (!AtomicCas(&CurInFlightReqs_, newVal, prev)); + return true; + } + + void Dec() { + i64 newVal = AtomicDecrement(CurInFlightReqs_); + Y_VERIFY(newVal >= 0); + } + + i64 GetCurrentInFlight() const { + return AtomicGet(CurInFlightReqs_); + } + +private: const TLimit Limit_; - TAtomic CurInFlightReqs_ = 0; -}; - + TAtomic CurInFlightReqs_ = 0; +}; + using TGlobalLimiter = TInFlightLimiterImpl<i64>; class IGRpcService: public TThrRefBase { -public: - virtual grpc::Service* GetService() = 0; - virtual void StopService() noexcept = 0; +public: + virtual grpc::Service* GetService() = 0; + virtual void StopService() noexcept = 0; virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0; - virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0; + virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0; virtual bool IsUnsafeToShutdown() const = 0; virtual size_t RequestsInProgress() const = 0; @@ -175,11 +175,11 @@ public: * service to inspect server options and initialize accordingly. */ virtual void SetServerOptions(const TServerOptions& options) = 0; -}; - -template<typename T> +}; + +template<typename T> class TGrpcServiceBase: public IGRpcService { -public: +public: class TShutdownGuard { using TOwner = TGrpcServiceBase<T>; friend class TGrpcServiceBase<T>; @@ -232,20 +232,20 @@ public: }; public: - using TCurrentGRpcService = T; - - void StopService() noexcept override { - with_lock(Lock_) { + using TCurrentGRpcService = T; + + void StopService() noexcept override { + with_lock(Lock_) { AtomicSet(ShuttingDown_, 1); - // Send TryCansel to event (can be send after finishing). - // Actual dtors will be called from grpc thread, so deadlock impossible - for (auto* request : Requests_) { - request->Shutdown(); - } - } - } - + // Send TryCansel to event (can be send after finishing). + // Actual dtors will be called from grpc thread, so deadlock impossible + for (auto* request : Requests_) { + request->Shutdown(); + } + } + } + TShutdownGuard ProtectShutdown() noexcept { AtomicIncrement(GuardCount_); if (IsShuttingDown()) { @@ -261,35 +261,35 @@ public: } size_t RequestsInProgress() const override { - size_t c = 0; - with_lock(Lock_) { - c = Requests_.size(); - } - return c; - } - + size_t c = 0; + with_lock(Lock_) { + c = Requests_.size(); + } + return c; + } + void SetServerOptions(const TServerOptions& options) override { SslServer_ = bool(options.SslData); NeedAuth_ = options.UseAuth; - } - - void SetGlobalLimiterHandle(TGlobalLimiter* /*limiter*/) override {} - - //! Check if the server is going to shut down. - bool IsShuttingDown() const { - return AtomicGet(ShuttingDown_); - } - + } + + void SetGlobalLimiterHandle(TGlobalLimiter* /*limiter*/) override {} + + //! Check if the server is going to shut down. + bool IsShuttingDown() const { + return AtomicGet(ShuttingDown_); + } + bool SslServer() const { return SslServer_; } - bool NeedAuth() const { - return NeedAuth_; - } - + bool NeedAuth() const { + return NeedAuth_; + } + bool RegisterRequestCtx(ICancelableContext* req) { - with_lock(Lock_) { + with_lock(Lock_) { auto r = Requests_.emplace(req); Y_VERIFY(r.second, "Ctx already registered"); @@ -298,59 +298,59 @@ public: Requests_.erase(r.first); return false; } - } + } return true; - } - - void DeregisterRequestCtx(ICancelableContext* req) { - with_lock(Lock_) { + } + + void DeregisterRequestCtx(ICancelableContext* req) { + with_lock(Lock_) { Y_VERIFY(Requests_.erase(req), "Ctx is not registered"); - } - } - -protected: - using TGrpcAsyncService = typename TCurrentGRpcService::AsyncService; - TGrpcAsyncService Service_; - + } + } + +protected: + using TGrpcAsyncService = typename TCurrentGRpcService::AsyncService; + TGrpcAsyncService Service_; + TGrpcAsyncService* GetService() override { - return &Service_; - } - -private: - TAtomic ShuttingDown_ = 0; + return &Service_; + } + +private: + TAtomic ShuttingDown_ = 0; TAtomic GuardCount_ = 0; - + bool SslServer_ = false; - bool NeedAuth_ = false; - - THashSet<ICancelableContext*> Requests_; - TAdaptiveLock Lock_; -}; - + bool NeedAuth_ = false; + + THashSet<ICancelableContext*> Requests_; + TAdaptiveLock Lock_; +}; + class TGRpcServer { -public: - using IGRpcServicePtr = TIntrusivePtr<IGRpcService>; - TGRpcServer(const TServerOptions& opts); - ~TGRpcServer(); - void AddService(IGRpcServicePtr service); - void Start(); - // Send stop to registred services and call Shutdown on grpc server - // This method MUST be called before destroying TGRpcServer - void Stop(); - ui16 GetPort() const; - TString GetHost() const; - -private: +public: + using IGRpcServicePtr = TIntrusivePtr<IGRpcService>; + TGRpcServer(const TServerOptions& opts); + ~TGRpcServer(); + void AddService(IGRpcServicePtr service); + void Start(); + // Send stop to registred services and call Shutdown on grpc server + // This method MUST be called before destroying TGRpcServer + void Stop(); + ui16 GetPort() const; + TString GetHost() const; + +private: using IThreadRef = TAutoPtr<IThreadFactory::IThread>; - - const TServerOptions Options_; - std::unique_ptr<grpc::Server> Server_; + + const TServerOptions Options_; + std::unique_ptr<grpc::Server> Server_; std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_; TVector<IThreadRef> Ts; - + TVector<IGRpcServicePtr> Services_; - TGlobalLimiter Limiter_; -}; - + TGlobalLimiter Limiter_; +}; + } // namespace NGrpc |