aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/server/grpc_server.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/grpc/server/grpc_server.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/grpc/server/grpc_server.cpp')
-rw-r--r--library/cpp/grpc/server/grpc_server.cpp240
1 files changed, 240 insertions, 0 deletions
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp
new file mode 100644
index 0000000000..7437b7a8f5
--- /dev/null
+++ b/library/cpp/grpc/server/grpc_server.cpp
@@ -0,0 +1,240 @@
+#include "grpc_server.h"
+
+#include <util/string/join.h>
+#include <util/generic/yexception.h>
+#include <util/system/thread.h>
+
+#include <grpc++/resource_quota.h>
+#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>
+
+#if !defined(_WIN32) && !defined(_WIN64)
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+
+#endif
+
+namespace NGrpc {
+
+using NThreading::TFuture;
+
+static void PullEvents(grpc::ServerCompletionQueue* cq) {
+ TThread::SetCurrentThreadName("grpc_server");
+ while (true) {
+ void* tag; // uniquely identifies a request.
+ bool ok;
+
+ if (cq->Next(&tag, &ok)) {
+ IQueueEvent* const ev(static_cast<IQueueEvent*>(tag));
+
+ if (!ev->Execute(ok)) {
+ ev->DestroyRequest();
+ }
+ } else {
+ break;
+ }
+ }
+}
+
+TGRpcServer::TGRpcServer(const TServerOptions& opts)
+ : Options_(opts)
+ , Limiter_(Options_.MaxGlobalRequestInFlight)
+ {}
+
+TGRpcServer::~TGRpcServer() {
+ Y_VERIFY(Ts.empty());
+ Services_.clear();
+}
+
+void TGRpcServer::AddService(IGRpcServicePtr service) {
+ Services_.push_back(service);
+}
+
+void TGRpcServer::Start() {
+ TString server_address(Join(":", Options_.Host, Options_.Port)); // https://st.yandex-team.ru/DTCC-695
+ using grpc::ServerBuilder;
+ using grpc::ResourceQuota;
+ ServerBuilder builder;
+ auto credentials = grpc::InsecureServerCredentials();
+ if (Options_.SslData) {
+ grpc::SslServerCredentialsOptions::PemKeyCertPair keycert;
+ keycert.cert_chain = std::move(Options_.SslData->Cert);
+ keycert.private_key = std::move(Options_.SslData->Key);
+ grpc::SslServerCredentialsOptions sslOps;
+ sslOps.pem_root_certs = std::move(Options_.SslData->Root);
+ sslOps.pem_key_cert_pairs.push_back(keycert);
+ credentials = grpc::SslServerCredentials(sslOps);
+ }
+ if (Options_.ExternalListener) {
+ Options_.ExternalListener->Init(builder.experimental().AddExternalConnectionAcceptor(
+ ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD,
+ credentials
+ ));
+ } else {
+ builder.AddListeningPort(server_address, credentials);
+ }
+ builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize);
+ builder.SetMaxSendMessageSize(Options_.MaxMessageSize);
+ for (IGRpcServicePtr service : Services_) {
+ service->SetServerOptions(Options_);
+ builder.RegisterService(service->GetService());
+ service->SetGlobalLimiterHandle(&Limiter_);
+ }
+
+ class TKeepAliveOption: public grpc::ServerBuilderOption {
+ public:
+ TKeepAliveOption(int idle, int interval)
+ : Idle(idle)
+ , Interval(interval)
+ , KeepAliveEnabled(true)
+ {}
+
+ TKeepAliveOption()
+ : Idle(0)
+ , Interval(0)
+ , KeepAliveEnabled(false)
+ {}
+
+ void UpdateArguments(grpc::ChannelArguments *args) override {
+ args->SetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES, 0);
+ args->SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 1000);
+ if (KeepAliveEnabled) {
+ args->SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
+ args->SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
+ args->SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, Idle * 1000);
+ args->SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, Idle * 1000);
+ args->SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, Interval * 1000);
+ }
+ }
+
+ void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override
+ {}
+ private:
+ const int Idle;
+ const int Interval;
+ const bool KeepAliveEnabled;
+ };
+
+ if (Options_.KeepAliveEnable) {
+ builder.SetOption(std::make_unique<TKeepAliveOption>(
+ Options_.KeepAliveIdleTimeoutTriggerSec,
+ Options_.KeepAliveProbeIntervalSec));
+ } else {
+ builder.SetOption(std::make_unique<TKeepAliveOption>());
+ }
+
+ if (Options_.UseCompletionQueuePerThread) {
+ for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
+ CQS_.push_back(builder.AddCompletionQueue());
+ }
+ } else {
+ CQS_.push_back(builder.AddCompletionQueue());
+ }
+
+ if (Options_.GRpcMemoryQuotaBytes) {
+ // See details KIKIMR-6932
+ /*
+ grpc::ResourceQuota quota("memory_bound");
+ quota.Resize(Options_.GRpcMemoryQuotaBytes);
+
+ builder.SetResourceQuota(quota);
+ */
+ Cerr << "GRpc memory quota temporarily disabled due to issues with grpc quoter" << Endl;
+ }
+ Options_.ServerBuilderMutator(builder);
+ builder.SetDefaultCompressionLevel(Options_.DefaultCompressionLevel);
+
+ Server_ = builder.BuildAndStart();
+ if (!Server_) {
+ ythrow yexception() << "can't start grpc server on " << server_address;
+ }
+
+ size_t index = 0;
+ for (IGRpcServicePtr service : Services_) {
+ // TODO: provide something else for services instead of ServerCompletionQueue
+ service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger);
+ }
+
+ if (Options_.UseCompletionQueuePerThread) {
+ for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
+ auto* cq = &CQS_[i];
+ Ts.push_back(SystemThreadFactory()->Run([cq] {
+ PullEvents(cq->get());
+ }));
+ }
+ } else {
+ for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
+ auto* cq = &CQS_[0];
+ Ts.push_back(SystemThreadFactory()->Run([cq] {
+ PullEvents(cq->get());
+ }));
+ }
+ }
+
+ if (Options_.ExternalListener) {
+ Options_.ExternalListener->Start();
+ }
+}
+
+void TGRpcServer::Stop() {
+ for (auto& service : Services_) {
+ service->StopService();
+ }
+
+ auto now = TInstant::Now();
+
+ if (Server_) {
+ i64 sec = Options_.GRpcShutdownDeadline.Seconds();
+ Y_VERIFY(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>());
+ i32 nanosecOfSec = Options_.GRpcShutdownDeadline.NanoSecondsOfSecond();
+ Server_->Shutdown(gpr_timespec{sec, nanosecOfSec, GPR_TIMESPAN});
+ }
+
+ for (ui64 attempt = 0; ; ++attempt) {
+ bool unsafe = false;
+ size_t infly = 0;
+ for (auto& service : Services_) {
+ unsafe |= service->IsUnsafeToShutdown();
+ infly += service->RequestsInProgress();
+ }
+
+ if (!unsafe && !infly)
+ break;
+
+ auto spent = (TInstant::Now() - now).SecondsFloat();
+ if (attempt % 300 == 0) {
+ // don't log too much
+ Cerr << "GRpc shutdown warning: left infly: " << infly << ", spent: " << spent << " sec" << Endl;
+ }
+
+ if (!unsafe && spent > Options_.GRpcShutdownDeadline.SecondsFloat())
+ break;
+ Sleep(TDuration::MilliSeconds(10));
+ }
+
+ // Always shutdown the completion queue after the server.
+ for (auto& cq : CQS_) {
+ cq->Shutdown();
+ }
+
+ for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) {
+ (*ti)->Join();
+ }
+
+ Ts.clear();
+
+ if (Options_.ExternalListener) {
+ Options_.ExternalListener->Stop();
+ }
+}
+
+ui16 TGRpcServer::GetPort() const {
+ return Options_.Port;
+}
+
+TString TGRpcServer::GetHost() const {
+ return Options_.Host;
+}
+
+} // namespace NGrpc