diff options
author | Daniil Cherednik <[email protected]> | 2022-02-10 16:50:16 +0300 |
---|---|---|
committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:50:16 +0300 |
commit | 17e20fa084178ddcb16255f974dbde74fb93608b (patch) | |
tree | 39605336c0b4d33928df69a256102c515fdf6ff5 /library/cpp/grpc/server/grpc_server.cpp | |
parent | 97df5ca7413550bf233fc6c7210e292fca0a51af (diff) |
Restoring authorship annotation for Daniil Cherednik <[email protected]>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/grpc/server/grpc_server.cpp')
-rw-r--r-- | library/cpp/grpc/server/grpc_server.cpp | 290 |
1 files changed, 145 insertions, 145 deletions
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp index 7437b7a8f5e..3e68b26e1c2 100644 --- a/library/cpp/grpc/server/grpc_server.cpp +++ b/library/cpp/grpc/server/grpc_server.cpp @@ -1,12 +1,12 @@ -#include "grpc_server.h" - -#include <util/string/join.h> -#include <util/generic/yexception.h> -#include <util/system/thread.h> - -#include <grpc++/resource_quota.h> +#include "grpc_server.h" + +#include <util/string/join.h> +#include <util/generic/yexception.h> +#include <util/system/thread.h> + +#include <grpc++/resource_quota.h> #include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h> - + #if !defined(_WIN32) && !defined(_WIN64) #include <sys/socket.h> @@ -16,9 +16,9 @@ #endif namespace NGrpc { - -using NThreading::TFuture; - + +using NThreading::TFuture; + static void PullEvents(grpc::ServerCompletionQueue* cq) { TThread::SetCurrentThreadName("grpc_server"); while (true) { @@ -37,33 +37,33 @@ static void PullEvents(grpc::ServerCompletionQueue* cq) { } } -TGRpcServer::TGRpcServer(const TServerOptions& opts) - : Options_(opts) - , Limiter_(Options_.MaxGlobalRequestInFlight) - {} - -TGRpcServer::~TGRpcServer() { - Y_VERIFY(Ts.empty()); - Services_.clear(); -} - -void TGRpcServer::AddService(IGRpcServicePtr service) { - Services_.push_back(service); -} - -void TGRpcServer::Start() { +TGRpcServer::TGRpcServer(const TServerOptions& opts) + : Options_(opts) + , Limiter_(Options_.MaxGlobalRequestInFlight) + {} + +TGRpcServer::~TGRpcServer() { + Y_VERIFY(Ts.empty()); + Services_.clear(); +} + +void TGRpcServer::AddService(IGRpcServicePtr service) { + Services_.push_back(service); +} + +void TGRpcServer::Start() { TString server_address(Join(":", Options_.Host, Options_.Port)); // https://st.yandex-team.ru/DTCC-695 - using grpc::ServerBuilder; - using grpc::ResourceQuota; - ServerBuilder builder; + using grpc::ServerBuilder; + using grpc::ResourceQuota; + ServerBuilder builder; auto credentials = grpc::InsecureServerCredentials(); if (Options_.SslData) { - grpc::SslServerCredentialsOptions::PemKeyCertPair keycert; - keycert.cert_chain = std::move(Options_.SslData->Cert); - keycert.private_key = std::move(Options_.SslData->Key); - grpc::SslServerCredentialsOptions sslOps; - sslOps.pem_root_certs = std::move(Options_.SslData->Root); - sslOps.pem_key_cert_pairs.push_back(keycert); + grpc::SslServerCredentialsOptions::PemKeyCertPair keycert; + keycert.cert_chain = std::move(Options_.SslData->Cert); + keycert.private_key = std::move(Options_.SslData->Key); + grpc::SslServerCredentialsOptions sslOps; + sslOps.pem_root_certs = std::move(Options_.SslData->Root); + sslOps.pem_key_cert_pairs.push_back(keycert); credentials = grpc::SslServerCredentials(sslOps); } if (Options_.ExternalListener) { @@ -72,58 +72,58 @@ void TGRpcServer::Start() { credentials )); } else { - builder.AddListeningPort(server_address, credentials); - } - builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize); - builder.SetMaxSendMessageSize(Options_.MaxMessageSize); - for (IGRpcServicePtr service : Services_) { + builder.AddListeningPort(server_address, credentials); + } + builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize); + builder.SetMaxSendMessageSize(Options_.MaxMessageSize); + for (IGRpcServicePtr service : Services_) { service->SetServerOptions(Options_); - builder.RegisterService(service->GetService()); - service->SetGlobalLimiterHandle(&Limiter_); - } - + builder.RegisterService(service->GetService()); + service->SetGlobalLimiterHandle(&Limiter_); + } + class TKeepAliveOption: public grpc::ServerBuilderOption { - public: - TKeepAliveOption(int idle, int interval) - : Idle(idle) - , Interval(interval) - , KeepAliveEnabled(true) - {} - - TKeepAliveOption() - : Idle(0) - , Interval(0) - , KeepAliveEnabled(false) - {} - - void UpdateArguments(grpc::ChannelArguments *args) override { - args->SetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES, 0); - args->SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 1000); - if (KeepAliveEnabled) { - args->SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0); - args->SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); - args->SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, Idle * 1000); - args->SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, Idle * 1000); - args->SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, Interval * 1000); - } - } - - void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override - {} - private: - const int Idle; - const int Interval; - const bool KeepAliveEnabled; - }; - - if (Options_.KeepAliveEnable) { - builder.SetOption(std::make_unique<TKeepAliveOption>( - Options_.KeepAliveIdleTimeoutTriggerSec, - Options_.KeepAliveProbeIntervalSec)); - } else { - builder.SetOption(std::make_unique<TKeepAliveOption>()); - } - + public: + TKeepAliveOption(int idle, int interval) + : Idle(idle) + , Interval(interval) + , KeepAliveEnabled(true) + {} + + TKeepAliveOption() + : Idle(0) + , Interval(0) + , KeepAliveEnabled(false) + {} + + void UpdateArguments(grpc::ChannelArguments *args) override { + args->SetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES, 0); + args->SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 1000); + if (KeepAliveEnabled) { + args->SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0); + args->SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); + args->SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, Idle * 1000); + args->SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, Idle * 1000); + args->SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, Interval * 1000); + } + } + + void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override + {} + private: + const int Idle; + const int Interval; + const bool KeepAliveEnabled; + }; + + if (Options_.KeepAliveEnable) { + builder.SetOption(std::make_unique<TKeepAliveOption>( + Options_.KeepAliveIdleTimeoutTriggerSec, + Options_.KeepAliveProbeIntervalSec)); + } else { + builder.SetOption(std::make_unique<TKeepAliveOption>()); + } + if (Options_.UseCompletionQueuePerThread) { for (size_t i = 0; i < Options_.WorkerThreads; ++i) { CQS_.push_back(builder.AddCompletionQueue()); @@ -132,30 +132,30 @@ void TGRpcServer::Start() { CQS_.push_back(builder.AddCompletionQueue()); } - if (Options_.GRpcMemoryQuotaBytes) { + if (Options_.GRpcMemoryQuotaBytes) { // See details KIKIMR-6932 - /* - grpc::ResourceQuota quota("memory_bound"); - quota.Resize(Options_.GRpcMemoryQuotaBytes); - - builder.SetResourceQuota(quota); - */ + /* + grpc::ResourceQuota quota("memory_bound"); + quota.Resize(Options_.GRpcMemoryQuotaBytes); + + builder.SetResourceQuota(quota); + */ Cerr << "GRpc memory quota temporarily disabled due to issues with grpc quoter" << Endl; - } + } Options_.ServerBuilderMutator(builder); builder.SetDefaultCompressionLevel(Options_.DefaultCompressionLevel); - - Server_ = builder.BuildAndStart(); - if (!Server_) { + + Server_ = builder.BuildAndStart(); + if (!Server_) { ythrow yexception() << "can't start grpc server on " << server_address; - } + } size_t index = 0; - for (IGRpcServicePtr service : Services_) { + for (IGRpcServicePtr service : Services_) { // TODO: provide something else for services instead of ServerCompletionQueue service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger); - } - + } + if (Options_.UseCompletionQueuePerThread) { for (size_t i = 0; i < Options_.WorkerThreads; ++i) { auto* cq = &CQS_[i]; @@ -170,71 +170,71 @@ void TGRpcServer::Start() { PullEvents(cq->get()); })); } - } + } if (Options_.ExternalListener) { Options_.ExternalListener->Start(); } -} - -void TGRpcServer::Stop() { - for (auto& service : Services_) { - service->StopService(); - } - - auto now = TInstant::Now(); - - if (Server_) { - i64 sec = Options_.GRpcShutdownDeadline.Seconds(); - Y_VERIFY(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>()); - i32 nanosecOfSec = Options_.GRpcShutdownDeadline.NanoSecondsOfSecond(); - Server_->Shutdown(gpr_timespec{sec, nanosecOfSec, GPR_TIMESPAN}); - } - +} + +void TGRpcServer::Stop() { + for (auto& service : Services_) { + service->StopService(); + } + + auto now = TInstant::Now(); + + if (Server_) { + i64 sec = Options_.GRpcShutdownDeadline.Seconds(); + Y_VERIFY(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>()); + i32 nanosecOfSec = Options_.GRpcShutdownDeadline.NanoSecondsOfSecond(); + Server_->Shutdown(gpr_timespec{sec, nanosecOfSec, GPR_TIMESPAN}); + } + for (ui64 attempt = 0; ; ++attempt) { bool unsafe = false; - size_t infly = 0; - for (auto& service : Services_) { + size_t infly = 0; + for (auto& service : Services_) { unsafe |= service->IsUnsafeToShutdown(); infly += service->RequestsInProgress(); - } - + } + if (!unsafe && !infly) - break; + break; - auto spent = (TInstant::Now() - now).SecondsFloat(); + auto spent = (TInstant::Now() - now).SecondsFloat(); if (attempt % 300 == 0) { // don't log too much Cerr << "GRpc shutdown warning: left infly: " << infly << ", spent: " << spent << " sec" << Endl; } if (!unsafe && spent > Options_.GRpcShutdownDeadline.SecondsFloat()) - break; - Sleep(TDuration::MilliSeconds(10)); - } - - // Always shutdown the completion queue after the server. + break; + Sleep(TDuration::MilliSeconds(10)); + } + + // Always shutdown the completion queue after the server. for (auto& cq : CQS_) { cq->Shutdown(); - } - - for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) { - (*ti)->Join(); - } - - Ts.clear(); + } + + for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) { + (*ti)->Join(); + } + + Ts.clear(); if (Options_.ExternalListener) { Options_.ExternalListener->Stop(); } -} - -ui16 TGRpcServer::GetPort() const { - return Options_.Port; -} - -TString TGRpcServer::GetHost() const { - return Options_.Host; -} - +} + +ui16 TGRpcServer::GetPort() const { + return Options_.Port; +} + +TString TGRpcServer::GetHost() const { + return Options_.Host; +} + } // namespace NGrpc |