aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/server/grpc_server.h
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/grpc/server/grpc_server.h
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/grpc/server/grpc_server.h')
-rw-r--r--library/cpp/grpc/server/grpc_server.h356
1 files changed, 356 insertions, 0 deletions
diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h
new file mode 100644
index 0000000000..d6814a90a0
--- /dev/null
+++ b/library/cpp/grpc/server/grpc_server.h
@@ -0,0 +1,356 @@
+#pragma once
+
+#include "grpc_request_base.h"
+#include "logger.h"
+
+#include <library/cpp/threading/future/future.h>
+
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+#include <util/generic/vector.h>
+#include <util/generic/maybe.h>
+#include <util/generic/queue.h>
+#include <util/generic/hash_set.h>
+#include <util/system/types.h>
+#include <util/system/mutex.h>
+#include <util/thread/factory.h>
+
+#include <grpc++/grpc++.h>
+
+namespace NGrpc {
+
+constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000;
+
+struct TSslData {
+ TString Cert;
+ TString Key;
+ TString Root;
+};
+
+struct IExternalListener
+ : public TThrRefBase
+{
+ using TPtr = TIntrusivePtr<IExternalListener>;
+ virtual void Init(std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> acceptor) = 0;
+ virtual void Start() = 0;
+ virtual void Stop() = 0;
+};
+
+//! Server's options.
+struct TServerOptions {
+#define DECLARE_FIELD(name, type, default) \
+ type name{default}; \
+ inline TServerOptions& Set##name(const type& value) { \
+ name = value; \
+ return *this; \
+ }
+
+ //! Hostname of server to bind to.
+ DECLARE_FIELD(Host, TString, "[::]");
+ //! Service port.
+ DECLARE_FIELD(Port, ui16, 0);
+
+ //! Number of worker threads.
+ DECLARE_FIELD(WorkerThreads, size_t, 2);
+
+ //! Create one completion queue per thread
+ DECLARE_FIELD(UseCompletionQueuePerThread, bool, false);
+
+ //! Memory quota size for grpc server in bytes. Zero means unlimited.
+ DECLARE_FIELD(GRpcMemoryQuotaBytes, size_t, 0);
+
+ //! How long to wait until pending rpcs are forcefully terminated.
+ DECLARE_FIELD(GRpcShutdownDeadline, TDuration, TDuration::Seconds(30));
+
+ //! In/Out message size limit
+ DECLARE_FIELD(MaxMessageSize, size_t, DEFAULT_GRPC_MESSAGE_SIZE_LIMIT);
+
+ //! Use GRpc keepalive
+ DECLARE_FIELD(KeepAliveEnable, TMaybe<bool>, TMaybe<bool>());
+
+ //! GRPC_ARG_KEEPALIVE_TIME_MS setting
+ DECLARE_FIELD(KeepAliveIdleTimeoutTriggerSec, int, 0);
+
+ //! Deprecated, ths option ignored. Will be removed soon.
+ DECLARE_FIELD(KeepAliveMaxProbeCount, int, 0);
+
+ //! GRPC_ARG_KEEPALIVE_TIMEOUT_MS setting
+ DECLARE_FIELD(KeepAliveProbeIntervalSec, int, 0);
+
+ //! Max number of requests processing by services (global limit for grpc server)
+ DECLARE_FIELD(MaxGlobalRequestInFlight, size_t, 100000);
+
+ //! SSL server data
+ DECLARE_FIELD(SslData, TMaybe<TSslData>, TMaybe<TSslData>());
+
+ //! GRPC auth
+ DECLARE_FIELD(UseAuth, bool, false);
+
+ //! Default compression level. Used when no compression options provided by client.
+ // Mapping to particular compression algorithm depends on client.
+ DECLARE_FIELD(DefaultCompressionLevel, grpc_compression_level, GRPC_COMPRESS_LEVEL_NONE);
+
+ //! Custom configurator for ServerBuilder.
+ DECLARE_FIELD(ServerBuilderMutator, std::function<void(grpc::ServerBuilder&)>, [](grpc::ServerBuilder&){});
+
+ DECLARE_FIELD(ExternalListener, IExternalListener::TPtr, nullptr);
+
+ //! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled).
+ DECLARE_FIELD(Logger, TLoggerPtr, nullptr);
+
+#undef DECLARE_FIELD
+};
+
+class IQueueEvent {
+public:
+ virtual ~IQueueEvent() = default;
+
+ //! Execute an action defined by implementation.
+ virtual bool Execute(bool ok) = 0;
+
+ //! It is time to perform action requested by AcquireToken server method. It will be called under lock which is also
+ // used in ReturnToken/AcquireToken methods. Default implementation does nothing assuming that request processor does
+ // not implement in flight management.
+ virtual void Process() {}
+
+ //! Finish and destroy request.
+ virtual void DestroyRequest() = 0;
+};
+
+class ICancelableContext {
+public:
+ virtual void Shutdown() = 0;
+ virtual ~ICancelableContext() = default;
+};
+
+template <class TLimit>
+class TInFlightLimiterImpl {
+public:
+ explicit TInFlightLimiterImpl(const TLimit& limit)
+ : Limit_(limit)
+ {}
+
+ bool Inc() {
+ i64 newVal;
+ i64 prev;
+ do {
+ prev = AtomicGet(CurInFlightReqs_);
+ Y_VERIFY(prev >= 0);
+ if (Limit_ && prev > Limit_) {
+ return false;
+ }
+ newVal = prev + 1;
+ } while (!AtomicCas(&CurInFlightReqs_, newVal, prev));
+ return true;
+ }
+
+ void Dec() {
+ i64 newVal = AtomicDecrement(CurInFlightReqs_);
+ Y_VERIFY(newVal >= 0);
+ }
+
+ i64 GetCurrentInFlight() const {
+ return AtomicGet(CurInFlightReqs_);
+ }
+
+private:
+ const TLimit Limit_;
+ TAtomic CurInFlightReqs_ = 0;
+};
+
+using TGlobalLimiter = TInFlightLimiterImpl<i64>;
+
+
+class IGRpcService: public TThrRefBase {
+public:
+ virtual grpc::Service* GetService() = 0;
+ virtual void StopService() noexcept = 0;
+ virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0;
+ virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0;
+ virtual bool IsUnsafeToShutdown() const = 0;
+ virtual size_t RequestsInProgress() const = 0;
+
+ /**
+ * Called before service is added to the server builder. This allows
+ * service to inspect server options and initialize accordingly.
+ */
+ virtual void SetServerOptions(const TServerOptions& options) = 0;
+};
+
+template<typename T>
+class TGrpcServiceBase: public IGRpcService {
+public:
+ class TShutdownGuard {
+ using TOwner = TGrpcServiceBase<T>;
+ friend class TGrpcServiceBase<T>;
+
+ public:
+ TShutdownGuard()
+ : Owner(nullptr)
+ { }
+
+ ~TShutdownGuard() {
+ Release();
+ }
+
+ TShutdownGuard(TShutdownGuard&& other)
+ : Owner(other.Owner)
+ {
+ other.Owner = nullptr;
+ }
+
+ TShutdownGuard& operator=(TShutdownGuard&& other) {
+ if (Y_LIKELY(this != &other)) {
+ Release();
+ Owner = other.Owner;
+ other.Owner = nullptr;
+ }
+ return *this;
+ }
+
+ explicit operator bool() const {
+ return bool(Owner);
+ }
+
+ void Release() {
+ if (Owner) {
+ AtomicDecrement(Owner->GuardCount_);
+ Owner = nullptr;
+ }
+ }
+
+ TShutdownGuard(const TShutdownGuard&) = delete;
+ TShutdownGuard& operator=(const TShutdownGuard&) = delete;
+
+ private:
+ explicit TShutdownGuard(TOwner* owner)
+ : Owner(owner)
+ { }
+
+ private:
+ TOwner* Owner;
+ };
+
+public:
+ using TCurrentGRpcService = T;
+
+ void StopService() noexcept override {
+ with_lock(Lock_) {
+ AtomicSet(ShuttingDown_, 1);
+
+ // Send TryCansel to event (can be send after finishing).
+ // Actual dtors will be called from grpc thread, so deadlock impossible
+ for (auto* request : Requests_) {
+ request->Shutdown();
+ }
+ }
+ }
+
+ TShutdownGuard ProtectShutdown() noexcept {
+ AtomicIncrement(GuardCount_);
+ if (IsShuttingDown()) {
+ AtomicDecrement(GuardCount_);
+ return { };
+ }
+
+ return TShutdownGuard(this);
+ };
+
+ bool IsUnsafeToShutdown() const override {
+ return AtomicGet(GuardCount_) > 0;
+ }
+
+ size_t RequestsInProgress() const override {
+ size_t c = 0;
+ with_lock(Lock_) {
+ c = Requests_.size();
+ }
+ return c;
+ }
+
+ void SetServerOptions(const TServerOptions& options) override {
+ SslServer_ = bool(options.SslData);
+ NeedAuth_ = options.UseAuth;
+ }
+
+ void SetGlobalLimiterHandle(TGlobalLimiter* /*limiter*/) override {}
+
+ //! Check if the server is going to shut down.
+ bool IsShuttingDown() const {
+ return AtomicGet(ShuttingDown_);
+ }
+
+ bool SslServer() const {
+ return SslServer_;
+ }
+
+ bool NeedAuth() const {
+ return NeedAuth_;
+ }
+
+ bool RegisterRequestCtx(ICancelableContext* req) {
+ with_lock(Lock_) {
+ auto r = Requests_.emplace(req);
+ Y_VERIFY(r.second, "Ctx already registered");
+
+ if (IsShuttingDown()) {
+ // Server is already shutting down
+ Requests_.erase(r.first);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ void DeregisterRequestCtx(ICancelableContext* req) {
+ with_lock(Lock_) {
+ Y_VERIFY(Requests_.erase(req), "Ctx is not registered");
+ }
+ }
+
+protected:
+ using TGrpcAsyncService = typename TCurrentGRpcService::AsyncService;
+ TGrpcAsyncService Service_;
+
+ TGrpcAsyncService* GetService() override {
+ return &Service_;
+ }
+
+private:
+ TAtomic ShuttingDown_ = 0;
+ TAtomic GuardCount_ = 0;
+
+ bool SslServer_ = false;
+ bool NeedAuth_ = false;
+
+ THashSet<ICancelableContext*> Requests_;
+ TAdaptiveLock Lock_;
+};
+
+class TGRpcServer {
+public:
+ using IGRpcServicePtr = TIntrusivePtr<IGRpcService>;
+ TGRpcServer(const TServerOptions& opts);
+ ~TGRpcServer();
+ void AddService(IGRpcServicePtr service);
+ void Start();
+ // Send stop to registred services and call Shutdown on grpc server
+ // This method MUST be called before destroying TGRpcServer
+ void Stop();
+ ui16 GetPort() const;
+ TString GetHost() const;
+
+private:
+ using IThreadRef = TAutoPtr<IThreadFactory::IThread>;
+
+ const TServerOptions Options_;
+ std::unique_ptr<grpc::Server> Server_;
+ std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_;
+ TVector<IThreadRef> Ts;
+
+ TVector<IGRpcServicePtr> Services_;
+ TGlobalLimiter Limiter_;
+};
+
+} // namespace NGrpc