aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/server/grpc_server.h
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:17 +0300
commit4b11037e5a7d071c63e3c966199fe7102e6462e4 (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc/server/grpc_server.h
parent17e20fa084178ddcb16255f974dbde74fb93608b (diff)
downloadydb-4b11037e5a7d071c63e3c966199fe7102e6462e4.tar.gz
Restoring authorship annotation for Daniil Cherednik <dcherednik@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/server/grpc_server.h')
-rw-r--r--library/cpp/grpc/server/grpc_server.h404
1 files changed, 202 insertions, 202 deletions
diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h
index 0a4123d84e..d6814a90a0 100644
--- a/library/cpp/grpc/server/grpc_server.h
+++ b/library/cpp/grpc/server/grpc_server.h
@@ -1,32 +1,32 @@
-#pragma once
-
+#pragma once
+
#include "grpc_request_base.h"
#include "logger.h"
#include <library/cpp/threading/future/future.h>
-
-#include <util/generic/ptr.h>
-#include <util/generic/string.h>
-#include <util/generic/vector.h>
+
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+#include <util/generic/vector.h>
#include <util/generic/maybe.h>
-#include <util/generic/queue.h>
-#include <util/generic/hash_set.h>
-#include <util/system/types.h>
-#include <util/system/mutex.h>
+#include <util/generic/queue.h>
+#include <util/generic/hash_set.h>
+#include <util/system/types.h>
+#include <util/system/mutex.h>
#include <util/thread/factory.h>
-
-#include <grpc++/grpc++.h>
-
+
+#include <grpc++/grpc++.h>
+
namespace NGrpc {
-
-constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000;
-
-struct TSslData {
- TString Cert;
- TString Key;
- TString Root;
-};
-
+
+constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000;
+
+struct TSslData {
+ TString Cert;
+ TString Key;
+ TString Root;
+};
+
struct IExternalListener
: public TThrRefBase
{
@@ -36,55 +36,55 @@ struct IExternalListener
virtual void Stop() = 0;
};
-//! Server's options.
-struct TServerOptions {
-#define DECLARE_FIELD(name, type, default) \
- type name{default}; \
- inline TServerOptions& Set##name(const type& value) { \
- name = value; \
- return *this; \
- }
-
- //! Hostname of server to bind to.
- DECLARE_FIELD(Host, TString, "[::]");
- //! Service port.
- DECLARE_FIELD(Port, ui16, 0);
-
- //! Number of worker threads.
- DECLARE_FIELD(WorkerThreads, size_t, 2);
-
+//! Server's options.
+struct TServerOptions {
+#define DECLARE_FIELD(name, type, default) \
+ type name{default}; \
+ inline TServerOptions& Set##name(const type& value) { \
+ name = value; \
+ return *this; \
+ }
+
+ //! Hostname of server to bind to.
+ DECLARE_FIELD(Host, TString, "[::]");
+ //! Service port.
+ DECLARE_FIELD(Port, ui16, 0);
+
+ //! Number of worker threads.
+ DECLARE_FIELD(WorkerThreads, size_t, 2);
+
//! Create one completion queue per thread
DECLARE_FIELD(UseCompletionQueuePerThread, bool, false);
- //! Memory quota size for grpc server in bytes. Zero means unlimited.
- DECLARE_FIELD(GRpcMemoryQuotaBytes, size_t, 0);
-
- //! How long to wait until pending rpcs are forcefully terminated.
- DECLARE_FIELD(GRpcShutdownDeadline, TDuration, TDuration::Seconds(30));
-
- //! In/Out message size limit
- DECLARE_FIELD(MaxMessageSize, size_t, DEFAULT_GRPC_MESSAGE_SIZE_LIMIT);
-
- //! Use GRpc keepalive
- DECLARE_FIELD(KeepAliveEnable, TMaybe<bool>, TMaybe<bool>());
-
- //! GRPC_ARG_KEEPALIVE_TIME_MS setting
+ //! Memory quota size for grpc server in bytes. Zero means unlimited.
+ DECLARE_FIELD(GRpcMemoryQuotaBytes, size_t, 0);
+
+ //! How long to wait until pending rpcs are forcefully terminated.
+ DECLARE_FIELD(GRpcShutdownDeadline, TDuration, TDuration::Seconds(30));
+
+ //! In/Out message size limit
+ DECLARE_FIELD(MaxMessageSize, size_t, DEFAULT_GRPC_MESSAGE_SIZE_LIMIT);
+
+ //! Use GRpc keepalive
+ DECLARE_FIELD(KeepAliveEnable, TMaybe<bool>, TMaybe<bool>());
+
+ //! GRPC_ARG_KEEPALIVE_TIME_MS setting
DECLARE_FIELD(KeepAliveIdleTimeoutTriggerSec, int, 0);
- //! Deprecated, ths option ignored. Will be removed soon.
+ //! Deprecated, ths option ignored. Will be removed soon.
DECLARE_FIELD(KeepAliveMaxProbeCount, int, 0);
- //! GRPC_ARG_KEEPALIVE_TIMEOUT_MS setting
+ //! GRPC_ARG_KEEPALIVE_TIMEOUT_MS setting
DECLARE_FIELD(KeepAliveProbeIntervalSec, int, 0);
- //! Max number of requests processing by services (global limit for grpc server)
- DECLARE_FIELD(MaxGlobalRequestInFlight, size_t, 100000);
-
- //! SSL server data
- DECLARE_FIELD(SslData, TMaybe<TSslData>, TMaybe<TSslData>());
-
- //! GRPC auth
- DECLARE_FIELD(UseAuth, bool, false);
+ //! Max number of requests processing by services (global limit for grpc server)
+ DECLARE_FIELD(MaxGlobalRequestInFlight, size_t, 100000);
+
+ //! SSL server data
+ DECLARE_FIELD(SslData, TMaybe<TSslData>, TMaybe<TSslData>());
+
+ //! GRPC auth
+ DECLARE_FIELD(UseAuth, bool, false);
//! Default compression level. Used when no compression options provided by client.
// Mapping to particular compression algorithm depends on client.
@@ -98,75 +98,75 @@ struct TServerOptions {
//! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled).
DECLARE_FIELD(Logger, TLoggerPtr, nullptr);
-#undef DECLARE_FIELD
-};
-
-class IQueueEvent {
-public:
- virtual ~IQueueEvent() = default;
-
+#undef DECLARE_FIELD
+};
+
+class IQueueEvent {
+public:
+ virtual ~IQueueEvent() = default;
+
//! Execute an action defined by implementation.
- virtual bool Execute(bool ok) = 0;
-
- //! It is time to perform action requested by AcquireToken server method. It will be called under lock which is also
- // used in ReturnToken/AcquireToken methods. Default implementation does nothing assuming that request processor does
- // not implement in flight management.
- virtual void Process() {}
-
- //! Finish and destroy request.
- virtual void DestroyRequest() = 0;
-};
-
-class ICancelableContext {
-public:
- virtual void Shutdown() = 0;
- virtual ~ICancelableContext() = default;
-};
-
+ virtual bool Execute(bool ok) = 0;
+
+ //! It is time to perform action requested by AcquireToken server method. It will be called under lock which is also
+ // used in ReturnToken/AcquireToken methods. Default implementation does nothing assuming that request processor does
+ // not implement in flight management.
+ virtual void Process() {}
+
+ //! Finish and destroy request.
+ virtual void DestroyRequest() = 0;
+};
+
+class ICancelableContext {
+public:
+ virtual void Shutdown() = 0;
+ virtual ~ICancelableContext() = default;
+};
+
template <class TLimit>
class TInFlightLimiterImpl {
-public:
+public:
explicit TInFlightLimiterImpl(const TLimit& limit)
- : Limit_(limit)
- {}
-
- bool Inc() {
- i64 newVal;
- i64 prev;
- do {
- prev = AtomicGet(CurInFlightReqs_);
- Y_VERIFY(prev >= 0);
- if (Limit_ && prev > Limit_) {
- return false;
- }
- newVal = prev + 1;
- } while (!AtomicCas(&CurInFlightReqs_, newVal, prev));
- return true;
- }
-
- void Dec() {
- i64 newVal = AtomicDecrement(CurInFlightReqs_);
- Y_VERIFY(newVal >= 0);
- }
-
- i64 GetCurrentInFlight() const {
- return AtomicGet(CurInFlightReqs_);
- }
-
-private:
+ : Limit_(limit)
+ {}
+
+ bool Inc() {
+ i64 newVal;
+ i64 prev;
+ do {
+ prev = AtomicGet(CurInFlightReqs_);
+ Y_VERIFY(prev >= 0);
+ if (Limit_ && prev > Limit_) {
+ return false;
+ }
+ newVal = prev + 1;
+ } while (!AtomicCas(&CurInFlightReqs_, newVal, prev));
+ return true;
+ }
+
+ void Dec() {
+ i64 newVal = AtomicDecrement(CurInFlightReqs_);
+ Y_VERIFY(newVal >= 0);
+ }
+
+ i64 GetCurrentInFlight() const {
+ return AtomicGet(CurInFlightReqs_);
+ }
+
+private:
const TLimit Limit_;
- TAtomic CurInFlightReqs_ = 0;
-};
-
+ TAtomic CurInFlightReqs_ = 0;
+};
+
using TGlobalLimiter = TInFlightLimiterImpl<i64>;
class IGRpcService: public TThrRefBase {
-public:
- virtual grpc::Service* GetService() = 0;
- virtual void StopService() noexcept = 0;
+public:
+ virtual grpc::Service* GetService() = 0;
+ virtual void StopService() noexcept = 0;
virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0;
- virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0;
+ virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0;
virtual bool IsUnsafeToShutdown() const = 0;
virtual size_t RequestsInProgress() const = 0;
@@ -175,11 +175,11 @@ public:
* service to inspect server options and initialize accordingly.
*/
virtual void SetServerOptions(const TServerOptions& options) = 0;
-};
-
-template<typename T>
+};
+
+template<typename T>
class TGrpcServiceBase: public IGRpcService {
-public:
+public:
class TShutdownGuard {
using TOwner = TGrpcServiceBase<T>;
friend class TGrpcServiceBase<T>;
@@ -232,20 +232,20 @@ public:
};
public:
- using TCurrentGRpcService = T;
-
- void StopService() noexcept override {
- with_lock(Lock_) {
+ using TCurrentGRpcService = T;
+
+ void StopService() noexcept override {
+ with_lock(Lock_) {
AtomicSet(ShuttingDown_, 1);
- // Send TryCansel to event (can be send after finishing).
- // Actual dtors will be called from grpc thread, so deadlock impossible
- for (auto* request : Requests_) {
- request->Shutdown();
- }
- }
- }
-
+ // Send TryCansel to event (can be send after finishing).
+ // Actual dtors will be called from grpc thread, so deadlock impossible
+ for (auto* request : Requests_) {
+ request->Shutdown();
+ }
+ }
+ }
+
TShutdownGuard ProtectShutdown() noexcept {
AtomicIncrement(GuardCount_);
if (IsShuttingDown()) {
@@ -261,35 +261,35 @@ public:
}
size_t RequestsInProgress() const override {
- size_t c = 0;
- with_lock(Lock_) {
- c = Requests_.size();
- }
- return c;
- }
-
+ size_t c = 0;
+ with_lock(Lock_) {
+ c = Requests_.size();
+ }
+ return c;
+ }
+
void SetServerOptions(const TServerOptions& options) override {
SslServer_ = bool(options.SslData);
NeedAuth_ = options.UseAuth;
- }
-
- void SetGlobalLimiterHandle(TGlobalLimiter* /*limiter*/) override {}
-
- //! Check if the server is going to shut down.
- bool IsShuttingDown() const {
- return AtomicGet(ShuttingDown_);
- }
-
+ }
+
+ void SetGlobalLimiterHandle(TGlobalLimiter* /*limiter*/) override {}
+
+ //! Check if the server is going to shut down.
+ bool IsShuttingDown() const {
+ return AtomicGet(ShuttingDown_);
+ }
+
bool SslServer() const {
return SslServer_;
}
- bool NeedAuth() const {
- return NeedAuth_;
- }
-
+ bool NeedAuth() const {
+ return NeedAuth_;
+ }
+
bool RegisterRequestCtx(ICancelableContext* req) {
- with_lock(Lock_) {
+ with_lock(Lock_) {
auto r = Requests_.emplace(req);
Y_VERIFY(r.second, "Ctx already registered");
@@ -298,59 +298,59 @@ public:
Requests_.erase(r.first);
return false;
}
- }
+ }
return true;
- }
-
- void DeregisterRequestCtx(ICancelableContext* req) {
- with_lock(Lock_) {
+ }
+
+ void DeregisterRequestCtx(ICancelableContext* req) {
+ with_lock(Lock_) {
Y_VERIFY(Requests_.erase(req), "Ctx is not registered");
- }
- }
-
-protected:
- using TGrpcAsyncService = typename TCurrentGRpcService::AsyncService;
- TGrpcAsyncService Service_;
-
+ }
+ }
+
+protected:
+ using TGrpcAsyncService = typename TCurrentGRpcService::AsyncService;
+ TGrpcAsyncService Service_;
+
TGrpcAsyncService* GetService() override {
- return &Service_;
- }
-
-private:
- TAtomic ShuttingDown_ = 0;
+ return &Service_;
+ }
+
+private:
+ TAtomic ShuttingDown_ = 0;
TAtomic GuardCount_ = 0;
-
+
bool SslServer_ = false;
- bool NeedAuth_ = false;
-
- THashSet<ICancelableContext*> Requests_;
- TAdaptiveLock Lock_;
-};
-
+ bool NeedAuth_ = false;
+
+ THashSet<ICancelableContext*> Requests_;
+ TAdaptiveLock Lock_;
+};
+
class TGRpcServer {
-public:
- using IGRpcServicePtr = TIntrusivePtr<IGRpcService>;
- TGRpcServer(const TServerOptions& opts);
- ~TGRpcServer();
- void AddService(IGRpcServicePtr service);
- void Start();
- // Send stop to registred services and call Shutdown on grpc server
- // This method MUST be called before destroying TGRpcServer
- void Stop();
- ui16 GetPort() const;
- TString GetHost() const;
-
-private:
+public:
+ using IGRpcServicePtr = TIntrusivePtr<IGRpcService>;
+ TGRpcServer(const TServerOptions& opts);
+ ~TGRpcServer();
+ void AddService(IGRpcServicePtr service);
+ void Start();
+ // Send stop to registred services and call Shutdown on grpc server
+ // This method MUST be called before destroying TGRpcServer
+ void Stop();
+ ui16 GetPort() const;
+ TString GetHost() const;
+
+private:
using IThreadRef = TAutoPtr<IThreadFactory::IThread>;
-
- const TServerOptions Options_;
- std::unique_ptr<grpc::Server> Server_;
+
+ const TServerOptions Options_;
+ std::unique_ptr<grpc::Server> Server_;
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_;
TVector<IThreadRef> Ts;
-
+
TVector<IGRpcServicePtr> Services_;
- TGlobalLimiter Limiter_;
-};
-
+ TGlobalLimiter Limiter_;
+};
+
} // namespace NGrpc