summaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/server/grpc_server.cpp
diff options
context:
space:
mode:
authorDaniil Cherednik <[email protected]>2022-02-10 16:50:16 +0300
committerDaniil Cherednik <[email protected]>2022-02-10 16:50:16 +0300
commit17e20fa084178ddcb16255f974dbde74fb93608b (patch)
tree39605336c0b4d33928df69a256102c515fdf6ff5 /library/cpp/grpc/server/grpc_server.cpp
parent97df5ca7413550bf233fc6c7210e292fca0a51af (diff)
Restoring authorship annotation for Daniil Cherednik <[email protected]>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/grpc/server/grpc_server.cpp')
-rw-r--r--library/cpp/grpc/server/grpc_server.cpp290
1 files changed, 145 insertions, 145 deletions
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp
index 7437b7a8f5e..3e68b26e1c2 100644
--- a/library/cpp/grpc/server/grpc_server.cpp
+++ b/library/cpp/grpc/server/grpc_server.cpp
@@ -1,12 +1,12 @@
-#include "grpc_server.h"
-
-#include <util/string/join.h>
-#include <util/generic/yexception.h>
-#include <util/system/thread.h>
-
-#include <grpc++/resource_quota.h>
+#include "grpc_server.h"
+
+#include <util/string/join.h>
+#include <util/generic/yexception.h>
+#include <util/system/thread.h>
+
+#include <grpc++/resource_quota.h>
#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>
-
+
#if !defined(_WIN32) && !defined(_WIN64)
#include <sys/socket.h>
@@ -16,9 +16,9 @@
#endif
namespace NGrpc {
-
-using NThreading::TFuture;
-
+
+using NThreading::TFuture;
+
static void PullEvents(grpc::ServerCompletionQueue* cq) {
TThread::SetCurrentThreadName("grpc_server");
while (true) {
@@ -37,33 +37,33 @@ static void PullEvents(grpc::ServerCompletionQueue* cq) {
}
}
-TGRpcServer::TGRpcServer(const TServerOptions& opts)
- : Options_(opts)
- , Limiter_(Options_.MaxGlobalRequestInFlight)
- {}
-
-TGRpcServer::~TGRpcServer() {
- Y_VERIFY(Ts.empty());
- Services_.clear();
-}
-
-void TGRpcServer::AddService(IGRpcServicePtr service) {
- Services_.push_back(service);
-}
-
-void TGRpcServer::Start() {
+TGRpcServer::TGRpcServer(const TServerOptions& opts)
+ : Options_(opts)
+ , Limiter_(Options_.MaxGlobalRequestInFlight)
+ {}
+
+TGRpcServer::~TGRpcServer() {
+ Y_VERIFY(Ts.empty());
+ Services_.clear();
+}
+
+void TGRpcServer::AddService(IGRpcServicePtr service) {
+ Services_.push_back(service);
+}
+
+void TGRpcServer::Start() {
TString server_address(Join(":", Options_.Host, Options_.Port)); // https://st.yandex-team.ru/DTCC-695
- using grpc::ServerBuilder;
- using grpc::ResourceQuota;
- ServerBuilder builder;
+ using grpc::ServerBuilder;
+ using grpc::ResourceQuota;
+ ServerBuilder builder;
auto credentials = grpc::InsecureServerCredentials();
if (Options_.SslData) {
- grpc::SslServerCredentialsOptions::PemKeyCertPair keycert;
- keycert.cert_chain = std::move(Options_.SslData->Cert);
- keycert.private_key = std::move(Options_.SslData->Key);
- grpc::SslServerCredentialsOptions sslOps;
- sslOps.pem_root_certs = std::move(Options_.SslData->Root);
- sslOps.pem_key_cert_pairs.push_back(keycert);
+ grpc::SslServerCredentialsOptions::PemKeyCertPair keycert;
+ keycert.cert_chain = std::move(Options_.SslData->Cert);
+ keycert.private_key = std::move(Options_.SslData->Key);
+ grpc::SslServerCredentialsOptions sslOps;
+ sslOps.pem_root_certs = std::move(Options_.SslData->Root);
+ sslOps.pem_key_cert_pairs.push_back(keycert);
credentials = grpc::SslServerCredentials(sslOps);
}
if (Options_.ExternalListener) {
@@ -72,58 +72,58 @@ void TGRpcServer::Start() {
credentials
));
} else {
- builder.AddListeningPort(server_address, credentials);
- }
- builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize);
- builder.SetMaxSendMessageSize(Options_.MaxMessageSize);
- for (IGRpcServicePtr service : Services_) {
+ builder.AddListeningPort(server_address, credentials);
+ }
+ builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize);
+ builder.SetMaxSendMessageSize(Options_.MaxMessageSize);
+ for (IGRpcServicePtr service : Services_) {
service->SetServerOptions(Options_);
- builder.RegisterService(service->GetService());
- service->SetGlobalLimiterHandle(&Limiter_);
- }
-
+ builder.RegisterService(service->GetService());
+ service->SetGlobalLimiterHandle(&Limiter_);
+ }
+
class TKeepAliveOption: public grpc::ServerBuilderOption {
- public:
- TKeepAliveOption(int idle, int interval)
- : Idle(idle)
- , Interval(interval)
- , KeepAliveEnabled(true)
- {}
-
- TKeepAliveOption()
- : Idle(0)
- , Interval(0)
- , KeepAliveEnabled(false)
- {}
-
- void UpdateArguments(grpc::ChannelArguments *args) override {
- args->SetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES, 0);
- args->SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 1000);
- if (KeepAliveEnabled) {
- args->SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
- args->SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
- args->SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, Idle * 1000);
- args->SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, Idle * 1000);
- args->SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, Interval * 1000);
- }
- }
-
- void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override
- {}
- private:
- const int Idle;
- const int Interval;
- const bool KeepAliveEnabled;
- };
-
- if (Options_.KeepAliveEnable) {
- builder.SetOption(std::make_unique<TKeepAliveOption>(
- Options_.KeepAliveIdleTimeoutTriggerSec,
- Options_.KeepAliveProbeIntervalSec));
- } else {
- builder.SetOption(std::make_unique<TKeepAliveOption>());
- }
-
+ public:
+ TKeepAliveOption(int idle, int interval)
+ : Idle(idle)
+ , Interval(interval)
+ , KeepAliveEnabled(true)
+ {}
+
+ TKeepAliveOption()
+ : Idle(0)
+ , Interval(0)
+ , KeepAliveEnabled(false)
+ {}
+
+ void UpdateArguments(grpc::ChannelArguments *args) override {
+ args->SetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES, 0);
+ args->SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 1000);
+ if (KeepAliveEnabled) {
+ args->SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
+ args->SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
+ args->SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, Idle * 1000);
+ args->SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, Idle * 1000);
+ args->SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, Interval * 1000);
+ }
+ }
+
+ void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override
+ {}
+ private:
+ const int Idle;
+ const int Interval;
+ const bool KeepAliveEnabled;
+ };
+
+ if (Options_.KeepAliveEnable) {
+ builder.SetOption(std::make_unique<TKeepAliveOption>(
+ Options_.KeepAliveIdleTimeoutTriggerSec,
+ Options_.KeepAliveProbeIntervalSec));
+ } else {
+ builder.SetOption(std::make_unique<TKeepAliveOption>());
+ }
+
if (Options_.UseCompletionQueuePerThread) {
for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
CQS_.push_back(builder.AddCompletionQueue());
@@ -132,30 +132,30 @@ void TGRpcServer::Start() {
CQS_.push_back(builder.AddCompletionQueue());
}
- if (Options_.GRpcMemoryQuotaBytes) {
+ if (Options_.GRpcMemoryQuotaBytes) {
// See details KIKIMR-6932
- /*
- grpc::ResourceQuota quota("memory_bound");
- quota.Resize(Options_.GRpcMemoryQuotaBytes);
-
- builder.SetResourceQuota(quota);
- */
+ /*
+ grpc::ResourceQuota quota("memory_bound");
+ quota.Resize(Options_.GRpcMemoryQuotaBytes);
+
+ builder.SetResourceQuota(quota);
+ */
Cerr << "GRpc memory quota temporarily disabled due to issues with grpc quoter" << Endl;
- }
+ }
Options_.ServerBuilderMutator(builder);
builder.SetDefaultCompressionLevel(Options_.DefaultCompressionLevel);
-
- Server_ = builder.BuildAndStart();
- if (!Server_) {
+
+ Server_ = builder.BuildAndStart();
+ if (!Server_) {
ythrow yexception() << "can't start grpc server on " << server_address;
- }
+ }
size_t index = 0;
- for (IGRpcServicePtr service : Services_) {
+ for (IGRpcServicePtr service : Services_) {
// TODO: provide something else for services instead of ServerCompletionQueue
service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger);
- }
-
+ }
+
if (Options_.UseCompletionQueuePerThread) {
for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
auto* cq = &CQS_[i];
@@ -170,71 +170,71 @@ void TGRpcServer::Start() {
PullEvents(cq->get());
}));
}
- }
+ }
if (Options_.ExternalListener) {
Options_.ExternalListener->Start();
}
-}
-
-void TGRpcServer::Stop() {
- for (auto& service : Services_) {
- service->StopService();
- }
-
- auto now = TInstant::Now();
-
- if (Server_) {
- i64 sec = Options_.GRpcShutdownDeadline.Seconds();
- Y_VERIFY(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>());
- i32 nanosecOfSec = Options_.GRpcShutdownDeadline.NanoSecondsOfSecond();
- Server_->Shutdown(gpr_timespec{sec, nanosecOfSec, GPR_TIMESPAN});
- }
-
+}
+
+void TGRpcServer::Stop() {
+ for (auto& service : Services_) {
+ service->StopService();
+ }
+
+ auto now = TInstant::Now();
+
+ if (Server_) {
+ i64 sec = Options_.GRpcShutdownDeadline.Seconds();
+ Y_VERIFY(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>());
+ i32 nanosecOfSec = Options_.GRpcShutdownDeadline.NanoSecondsOfSecond();
+ Server_->Shutdown(gpr_timespec{sec, nanosecOfSec, GPR_TIMESPAN});
+ }
+
for (ui64 attempt = 0; ; ++attempt) {
bool unsafe = false;
- size_t infly = 0;
- for (auto& service : Services_) {
+ size_t infly = 0;
+ for (auto& service : Services_) {
unsafe |= service->IsUnsafeToShutdown();
infly += service->RequestsInProgress();
- }
-
+ }
+
if (!unsafe && !infly)
- break;
+ break;
- auto spent = (TInstant::Now() - now).SecondsFloat();
+ auto spent = (TInstant::Now() - now).SecondsFloat();
if (attempt % 300 == 0) {
// don't log too much
Cerr << "GRpc shutdown warning: left infly: " << infly << ", spent: " << spent << " sec" << Endl;
}
if (!unsafe && spent > Options_.GRpcShutdownDeadline.SecondsFloat())
- break;
- Sleep(TDuration::MilliSeconds(10));
- }
-
- // Always shutdown the completion queue after the server.
+ break;
+ Sleep(TDuration::MilliSeconds(10));
+ }
+
+ // Always shutdown the completion queue after the server.
for (auto& cq : CQS_) {
cq->Shutdown();
- }
-
- for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) {
- (*ti)->Join();
- }
-
- Ts.clear();
+ }
+
+ for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) {
+ (*ti)->Join();
+ }
+
+ Ts.clear();
if (Options_.ExternalListener) {
Options_.ExternalListener->Stop();
}
-}
-
-ui16 TGRpcServer::GetPort() const {
- return Options_.Port;
-}
-
-TString TGRpcServer::GetHost() const {
- return Options_.Host;
-}
-
+}
+
+ui16 TGRpcServer::GetPort() const {
+ return Options_.Port;
+}
+
+TString TGRpcServer::GetHost() const {
+ return Options_.Host;
+}
+
} // namespace NGrpc