summaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/server/grpc_server.cpp
diff options
context:
space:
mode:
authordcherednik <[email protected]>2023-11-29 13:50:45 +0300
committerdcherednik <[email protected]>2023-11-29 15:47:31 +0300
commit10fd58d05678db9a22303a46178f5ed6c7150601 (patch)
tree3a9837bd2156df5d6e54d182679e8f0f44f7bc7d /library/cpp/grpc/server/grpc_server.cpp
parente92c0cb46ca4a92ac06cef509ab210296d9f0b99 (diff)
Use own copy of library/grpc
Diffstat (limited to 'library/cpp/grpc/server/grpc_server.cpp')
-rw-r--r--library/cpp/grpc/server/grpc_server.cpp251
1 files changed, 0 insertions, 251 deletions
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp
deleted file mode 100644
index a38d4c9da6c..00000000000
--- a/library/cpp/grpc/server/grpc_server.cpp
+++ /dev/null
@@ -1,251 +0,0 @@
-#include "grpc_server.h"
-
-#include <util/string/join.h>
-#include <util/generic/yexception.h>
-#include <util/system/thread.h>
-#include <util/generic/map.h>
-
-#include <grpc++/resource_quota.h>
-#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>
-
-#if !defined(_WIN32) && !defined(_WIN64)
-
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-
-#endif
-
-namespace NGrpc {
-
-using NThreading::TFuture;
-
-static void PullEvents(grpc::ServerCompletionQueue* cq) {
- TThread::SetCurrentThreadName("grpc_server");
- while (true) {
- void* tag; // uniquely identifies a request.
- bool ok;
-
- if (cq->Next(&tag, &ok)) {
- IQueueEvent* const ev(static_cast<IQueueEvent*>(tag));
-
- if (!ev->Execute(ok)) {
- ev->DestroyRequest();
- }
- } else {
- break;
- }
- }
-}
-
-TGRpcServer::TGRpcServer(const TServerOptions& opts)
- : Options_(opts)
- , Limiter_(Options_.MaxGlobalRequestInFlight)
- {}
-
-TGRpcServer::~TGRpcServer() {
- Y_ABORT_UNLESS(Ts.empty());
- Services_.clear();
-}
-
-void TGRpcServer::AddService(IGRpcServicePtr service) {
- Services_.push_back(service);
-}
-
-void TGRpcServer::Start() {
- TString server_address(Join(":", Options_.Host, Options_.Port)); // https://st.yandex-team.ru/DTCC-695
- using grpc::ServerBuilder;
- using grpc::ResourceQuota;
- ServerBuilder builder;
- auto credentials = grpc::InsecureServerCredentials();
- if (Options_.SslData) {
- grpc::SslServerCredentialsOptions::PemKeyCertPair keycert;
- keycert.cert_chain = std::move(Options_.SslData->Cert);
- keycert.private_key = std::move(Options_.SslData->Key);
- grpc::SslServerCredentialsOptions sslOps;
- sslOps.pem_root_certs = std::move(Options_.SslData->Root);
- sslOps.pem_key_cert_pairs.push_back(keycert);
-
- if (Options_.SslData->DoRequestClientCertificate) {
- sslOps.client_certificate_request = GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY;
- }
-
- credentials = grpc::SslServerCredentials(sslOps);
- }
- if (Options_.ExternalListener) {
- Options_.ExternalListener->Init(builder.experimental().AddExternalConnectionAcceptor(
- ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD,
- credentials
- ));
- } else {
- builder.AddListeningPort(server_address, credentials);
- }
- builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize);
- builder.SetMaxSendMessageSize(Options_.MaxMessageSize);
- for (IGRpcServicePtr service : Services_) {
- service->SetServerOptions(Options_);
- builder.RegisterService(service->GetService());
- service->SetGlobalLimiterHandle(&Limiter_);
- }
-
- class TKeepAliveOption: public grpc::ServerBuilderOption {
- public:
- TKeepAliveOption(int idle, int interval)
- : Idle(idle)
- , Interval(interval)
- , KeepAliveEnabled(true)
- {}
-
- TKeepAliveOption()
- : Idle(0)
- , Interval(0)
- , KeepAliveEnabled(false)
- {}
-
- void UpdateArguments(grpc::ChannelArguments *args) override {
- args->SetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES, 0);
- args->SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 1000);
- if (KeepAliveEnabled) {
- args->SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
- args->SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
- args->SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, Idle * 1000);
- args->SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, Idle * 1000);
- args->SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, Interval * 1000);
- }
- }
-
- void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override
- {}
- private:
- const int Idle;
- const int Interval;
- const bool KeepAliveEnabled;
- };
-
- if (Options_.KeepAliveEnable) {
- builder.SetOption(std::make_unique<TKeepAliveOption>(
- Options_.KeepAliveIdleTimeoutTriggerSec,
- Options_.KeepAliveProbeIntervalSec));
- } else {
- builder.SetOption(std::make_unique<TKeepAliveOption>());
- }
-
- size_t completionQueueCount = 1;
- if (Options_.WorkersPerCompletionQueue) {
- size_t threadsPerQueue = Max(std::size_t{1}, Options_.WorkersPerCompletionQueue);
- completionQueueCount = (Options_.WorkerThreads + threadsPerQueue - 1) / threadsPerQueue; // ceiling
- } else if (Options_.UseCompletionQueuePerThread) {
- completionQueueCount = Options_.WorkerThreads;
- }
-
- CQS_.reserve(completionQueueCount);
- for (size_t i = 0; i < completionQueueCount; ++i) {
- CQS_.push_back(builder.AddCompletionQueue());
- }
-
- if (Options_.GRpcMemoryQuotaBytes) {
- // See details KIKIMR-6932
- if (Options_.EnableGRpcMemoryQuota) {
- grpc::ResourceQuota quota("memory_bound");
- quota.Resize(Options_.GRpcMemoryQuotaBytes);
-
- builder.SetResourceQuota(quota);
-
- Cerr << "Set GRpc memory quota to: " << Options_.GRpcMemoryQuotaBytes << Endl;
- } else {
- Cerr << "GRpc memory quota was set but disabled due to issues with grpc quoter"
- ", to enable it use EnableGRpcMemoryQuota option" << Endl;
- }
- }
- Options_.ServerBuilderMutator(builder);
- builder.SetDefaultCompressionLevel(Options_.DefaultCompressionLevel);
-
- Server_ = builder.BuildAndStart();
- if (!Server_) {
- ythrow yexception() << "can't start grpc server on " << server_address;
- }
-
- size_t index = 0;
- for (IGRpcServicePtr service : Services_) {
- // TODO: provide something else for services instead of ServerCompletionQueue
- service->InitService(CQS_, Options_.Logger, index++);
- }
-
- Ts.reserve(Options_.WorkerThreads);
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- auto* cq = &CQS_[i % CQS_.size()];
- Ts.push_back(SystemThreadFactory()->Run([cq] {
- PullEvents(cq->get());
- }));
- }
-
- if (Options_.ExternalListener) {
- Options_.ExternalListener->Start();
- }
-}
-
-void TGRpcServer::Stop() {
- for (auto& service : Services_) {
- service->StopService();
- }
-
- auto now = TInstant::Now();
-
- if (Server_) {
- i64 sec = Options_.GRpcShutdownDeadline.Seconds();
- Y_ABORT_UNLESS(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>());
- i32 nanosecOfSec = Options_.GRpcShutdownDeadline.NanoSecondsOfSecond();
- Server_->Shutdown(gpr_timespec{sec, nanosecOfSec, GPR_TIMESPAN});
- }
-
- for (ui64 attempt = 0; ; ++attempt) {
- bool unsafe = false;
- size_t infly = 0;
- for (auto& service : Services_) {
- unsafe |= service->IsUnsafeToShutdown();
- infly += service->RequestsInProgress();
- }
-
- if (!unsafe && !infly)
- break;
-
- auto spent = (TInstant::Now() - now).SecondsFloat();
- if (attempt % 300 == 0) {
- // don't log too much
- Cerr << "GRpc shutdown warning: left infly: " << infly << ", spent: " << spent << " sec" << Endl;
- }
-
- if (!unsafe && spent > Options_.GRpcShutdownDeadline.SecondsFloat())
- break;
- Sleep(TDuration::MilliSeconds(10));
- }
-
- // Always shutdown the completion queue after the server.
- for (auto& cq : CQS_) {
- cq->Shutdown();
- }
-
- for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) {
- (*ti)->Join();
- }
-
- Ts.clear();
-
- if (Options_.ExternalListener) {
- Options_.ExternalListener->Stop();
- }
-}
-
-ui16 TGRpcServer::GetPort() const {
- return Options_.Port;
-}
-
-TString TGRpcServer::GetHost() const {
- return Options_.Host;
-}
-
-const TVector<TGRpcServer::IGRpcServicePtr>& TGRpcServer::GetServices() const {
- return Services_;
-}
-
-} // namespace NGrpc