From 1110808a9d39d4b808aef724c861a2e1a38d2a69 Mon Sep 17 00:00:00 2001
From: Devtools Arcadia <arcadia-devtools@yandex-team.ru>
Date: Mon, 7 Feb 2022 18:08:42 +0300
Subject: intermediate changes ref:cde9a383711a11544ce7e107a78147fb96cc4029

---
 library/cpp/grpc/server/grpc_server.cpp | 240 ++++++++++++++++++++++++++++++++
 1 file changed, 240 insertions(+)
 create mode 100644 library/cpp/grpc/server/grpc_server.cpp

(limited to 'library/cpp/grpc/server/grpc_server.cpp')

diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp
new file mode 100644
index 0000000000..7437b7a8f5
--- /dev/null
+++ b/library/cpp/grpc/server/grpc_server.cpp
@@ -0,0 +1,240 @@
+#include "grpc_server.h"
+
+#include <util/string/join.h>
+#include <util/generic/yexception.h>
+#include <util/system/thread.h>
+
+#include <grpc++/resource_quota.h>
+#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>
+
+#if !defined(_WIN32) && !defined(_WIN64)
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+
+#endif
+
+namespace NGrpc {
+
+using NThreading::TFuture;
+
+static void PullEvents(grpc::ServerCompletionQueue* cq) {
+    TThread::SetCurrentThreadName("grpc_server");
+    while (true) {
+        void* tag; // uniquely identifies a request.
+        bool ok;
+
+        if (cq->Next(&tag, &ok)) {
+            IQueueEvent* const ev(static_cast<IQueueEvent*>(tag));
+
+            if (!ev->Execute(ok)) {
+                ev->DestroyRequest();
+            }
+        } else {
+            break;
+        }
+    }
+}
+
+TGRpcServer::TGRpcServer(const TServerOptions& opts)
+    : Options_(opts)
+    , Limiter_(Options_.MaxGlobalRequestInFlight)
+    {}
+
+TGRpcServer::~TGRpcServer() {
+    Y_VERIFY(Ts.empty());
+    Services_.clear();
+}
+
+void TGRpcServer::AddService(IGRpcServicePtr service) {
+    Services_.push_back(service);
+}
+
+void TGRpcServer::Start() {
+    TString server_address(Join(":", Options_.Host, Options_.Port)); // https://st.yandex-team.ru/DTCC-695
+    using grpc::ServerBuilder;
+    using grpc::ResourceQuota;
+    ServerBuilder builder;
+    auto credentials = grpc::InsecureServerCredentials();
+    if (Options_.SslData) {
+        grpc::SslServerCredentialsOptions::PemKeyCertPair keycert;
+        keycert.cert_chain = std::move(Options_.SslData->Cert);
+        keycert.private_key = std::move(Options_.SslData->Key);
+        grpc::SslServerCredentialsOptions sslOps;
+        sslOps.pem_root_certs = std::move(Options_.SslData->Root);
+        sslOps.pem_key_cert_pairs.push_back(keycert);
+        credentials = grpc::SslServerCredentials(sslOps);
+    }
+    if (Options_.ExternalListener) {
+        Options_.ExternalListener->Init(builder.experimental().AddExternalConnectionAcceptor(
+            ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD,
+            credentials
+        ));
+    } else {
+        builder.AddListeningPort(server_address, credentials);
+    }
+    builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize);
+    builder.SetMaxSendMessageSize(Options_.MaxMessageSize);
+    for (IGRpcServicePtr service : Services_) {
+        service->SetServerOptions(Options_);
+        builder.RegisterService(service->GetService());
+        service->SetGlobalLimiterHandle(&Limiter_);
+    }
+
+    class TKeepAliveOption: public grpc::ServerBuilderOption {
+    public:
+        TKeepAliveOption(int idle, int interval)
+            : Idle(idle)
+            , Interval(interval)
+            , KeepAliveEnabled(true)
+        {}
+
+        TKeepAliveOption()
+            : Idle(0)
+            , Interval(0)
+            , KeepAliveEnabled(false)
+       {}
+
+       void UpdateArguments(grpc::ChannelArguments *args) override {
+            args->SetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES, 0);
+            args->SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 1000);
+            if (KeepAliveEnabled) {
+                args->SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
+                args->SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
+                args->SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, Idle * 1000);
+                args->SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, Idle * 1000);
+                args->SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, Interval * 1000);
+            }
+        }
+
+        void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override
+        {}
+    private:
+        const int Idle;
+        const int Interval;
+        const bool KeepAliveEnabled;
+    };
+
+    if (Options_.KeepAliveEnable) {
+        builder.SetOption(std::make_unique<TKeepAliveOption>(
+            Options_.KeepAliveIdleTimeoutTriggerSec,
+            Options_.KeepAliveProbeIntervalSec));
+    } else {
+        builder.SetOption(std::make_unique<TKeepAliveOption>());
+    }
+
+    if (Options_.UseCompletionQueuePerThread) {
+        for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
+            CQS_.push_back(builder.AddCompletionQueue());
+        }
+    } else {
+        CQS_.push_back(builder.AddCompletionQueue());
+    }
+
+    if (Options_.GRpcMemoryQuotaBytes) {
+        // See details KIKIMR-6932
+        /*
+        grpc::ResourceQuota quota("memory_bound");
+        quota.Resize(Options_.GRpcMemoryQuotaBytes);
+
+        builder.SetResourceQuota(quota);
+        */
+        Cerr << "GRpc memory quota temporarily disabled due to issues with grpc quoter" << Endl;
+    }
+    Options_.ServerBuilderMutator(builder);
+    builder.SetDefaultCompressionLevel(Options_.DefaultCompressionLevel);
+
+    Server_ = builder.BuildAndStart();
+    if (!Server_) {
+        ythrow yexception() << "can't start grpc server on " << server_address;
+    }
+
+    size_t index = 0;
+    for (IGRpcServicePtr service : Services_) {
+        // TODO: provide something else for services instead of ServerCompletionQueue
+        service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger);
+    }
+
+    if (Options_.UseCompletionQueuePerThread) {
+        for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
+            auto* cq = &CQS_[i];
+            Ts.push_back(SystemThreadFactory()->Run([cq] {
+                PullEvents(cq->get());
+            }));
+        }
+    } else {
+        for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
+            auto* cq = &CQS_[0];
+            Ts.push_back(SystemThreadFactory()->Run([cq] {
+                PullEvents(cq->get());
+            }));
+        }
+    }
+
+    if (Options_.ExternalListener) {
+        Options_.ExternalListener->Start();
+    }
+}
+
+void TGRpcServer::Stop() {
+    for (auto& service : Services_) {
+        service->StopService();
+    }
+
+    auto now = TInstant::Now();
+
+    if (Server_) {
+        i64 sec = Options_.GRpcShutdownDeadline.Seconds();
+        Y_VERIFY(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>());
+        i32 nanosecOfSec =  Options_.GRpcShutdownDeadline.NanoSecondsOfSecond();
+        Server_->Shutdown(gpr_timespec{sec, nanosecOfSec, GPR_TIMESPAN});
+    }
+
+    for (ui64 attempt = 0; ; ++attempt) {
+        bool unsafe = false;
+        size_t infly = 0;
+        for (auto& service : Services_) {
+            unsafe |= service->IsUnsafeToShutdown();
+            infly += service->RequestsInProgress();
+        }
+
+        if (!unsafe && !infly)
+            break;
+
+        auto spent = (TInstant::Now() - now).SecondsFloat();
+        if (attempt % 300 == 0) {
+            // don't log too much
+            Cerr << "GRpc shutdown warning: left infly: " << infly << ", spent: " << spent << " sec" <<  Endl;
+        }
+
+        if (!unsafe && spent > Options_.GRpcShutdownDeadline.SecondsFloat())
+            break;
+        Sleep(TDuration::MilliSeconds(10));
+    }
+
+    // Always shutdown the completion queue after the server.
+    for (auto& cq : CQS_) {
+        cq->Shutdown();
+    }
+
+    for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) {
+        (*ti)->Join();
+    }
+
+    Ts.clear();
+
+    if (Options_.ExternalListener) {
+        Options_.ExternalListener->Stop();
+    }
+}
+
+ui16 TGRpcServer::GetPort() const {
+    return Options_.Port;
+}
+
+TString TGRpcServer::GetHost() const {
+    return Options_.Host;
+}
+
+} // namespace NGrpc
-- 
cgit v1.2.3