#include "grpc_server.h"
#include <util/string/join.h>
#include <util/generic/yexception.h>
#include <util/system/thread.h>
#include <grpc++/resource_quota.h>
#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>
#if !defined(_WIN32) && !defined(_WIN64)
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#endif
namespace NGrpc {
using NThreading::TFuture;
static void PullEvents(grpc::ServerCompletionQueue* cq) {
TThread::SetCurrentThreadName("grpc_server");
while (true) {
void* tag; // uniquely identifies a request.
bool ok;
if (cq->Next(&tag, &ok)) {
IQueueEvent* const ev(static_cast<IQueueEvent*>(tag));
if (!ev->Execute(ok)) {
ev->DestroyRequest();
}
} else {
break;
}
}
}
TGRpcServer::TGRpcServer(const TServerOptions& opts)
: Options_(opts)
, Limiter_(Options_.MaxGlobalRequestInFlight)
{}
TGRpcServer::~TGRpcServer() {
Y_VERIFY(Ts.empty());
Services_.clear();
}
void TGRpcServer::AddService(IGRpcServicePtr service) {
Services_.push_back(service);
}
void TGRpcServer::Start() {
TString server_address(Join(":", Options_.Host, Options_.Port)); // https://st.yandex-team.ru/DTCC-695
using grpc::ServerBuilder;
using grpc::ResourceQuota;
ServerBuilder builder;
auto credentials = grpc::InsecureServerCredentials();
if (Options_.SslData) {
grpc::SslServerCredentialsOptions::PemKeyCertPair keycert;
keycert.cert_chain = std::move(Options_.SslData->Cert);
keycert.private_key = std::move(Options_.SslData->Key);
grpc::SslServerCredentialsOptions sslOps;
sslOps.pem_root_certs = std::move(Options_.SslData->Root);
sslOps.pem_key_cert_pairs.push_back(keycert);
credentials = grpc::SslServerCredentials(sslOps);
}
if (Options_.ExternalListener) {
Options_.ExternalListener->Init(builder.experimental().AddExternalConnectionAcceptor(
ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD,
credentials
));
} else {
builder.AddListeningPort(server_address, credentials);
}
builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize);
builder.SetMaxSendMessageSize(Options_.MaxMessageSize);
for (IGRpcServicePtr service : Services_) {
service->SetServerOptions(Options_);
builder.RegisterService(service->GetService());
service->SetGlobalLimiterHandle(&Limiter_);
}
class TKeepAliveOption: public grpc::ServerBuilderOption {
public:
TKeepAliveOption(int idle, int interval)
: Idle(idle)
, Interval(interval)
, KeepAliveEnabled(true)
{}
TKeepAliveOption()
: Idle(0)
, Interval(0)
, KeepAliveEnabled(false)
{}
void UpdateArguments(grpc::ChannelArguments *args) override {
args->SetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES, 0);
args->SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 1000);
if (KeepAliveEnabled) {
args->SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
args->SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
args->SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, Idle * 1000);
args->SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, Idle * 1000);
args->SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, Interval * 1000);
}
}
void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override
{}
private:
const int Idle;
const int Interval;
const bool KeepAliveEnabled;
};
if (Options_.KeepAliveEnable) {
builder.SetOption(std::make_unique<TKeepAliveOption>(
Options_.KeepAliveIdleTimeoutTriggerSec,
Options_.KeepAliveProbeIntervalSec));
} else {
builder.SetOption(std::make_unique<TKeepAliveOption>());
}
if (Options_.UseCompletionQueuePerThread) {
for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
CQS_.push_back(builder.AddCompletionQueue());
}
} else {
CQS_.push_back(builder.AddCompletionQueue());
}
if (Options_.GRpcMemoryQuotaBytes) {
// See details KIKIMR-6932
/*
grpc::ResourceQuota quota("memory_bound");
quota.Resize(Options_.GRpcMemoryQuotaBytes);
builder.SetResourceQuota(quota);
*/
Cerr << "GRpc memory quota temporarily disabled due to issues with grpc quoter" << Endl;
}
Options_.ServerBuilderMutator(builder);
builder.SetDefaultCompressionLevel(Options_.DefaultCompressionLevel);
Server_ = builder.BuildAndStart();
if (!Server_) {
ythrow yexception() << "can't start grpc server on " << server_address;
}
size_t index = 0;
for (IGRpcServicePtr service : Services_) {
// TODO: provide something else for services instead of ServerCompletionQueue
service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger);
}
if (Options_.UseCompletionQueuePerThread) {
for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
auto* cq = &CQS_[i];
Ts.push_back(SystemThreadFactory()->Run([cq] {
PullEvents(cq->get());
}));
}
} else {
for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
auto* cq = &CQS_[0];
Ts.push_back(SystemThreadFactory()->Run([cq] {
PullEvents(cq->get());
}));
}
}
if (Options_.ExternalListener) {
Options_.ExternalListener->Start();
}
}
void TGRpcServer::Stop() {
for (auto& service : Services_) {
service->StopService();
}
auto now = TInstant::Now();
if (Server_) {
i64 sec = Options_.GRpcShutdownDeadline.Seconds();
Y_VERIFY(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>());
i32 nanosecOfSec = Options_.GRpcShutdownDeadline.NanoSecondsOfSecond();
Server_->Shutdown(gpr_timespec{sec, nanosecOfSec, GPR_TIMESPAN});
}
for (ui64 attempt = 0; ; ++attempt) {
bool unsafe = false;
size_t infly = 0;
for (auto& service : Services_) {
unsafe |= service->IsUnsafeToShutdown();
infly += service->RequestsInProgress();
}
if (!unsafe && !infly)
break;
auto spent = (TInstant::Now() - now).SecondsFloat();
if (attempt % 300 == 0) {
// don't log too much
Cerr << "GRpc shutdown warning: left infly: " << infly << ", spent: " << spent << " sec" << Endl;
}
if (!unsafe && spent > Options_.GRpcShutdownDeadline.SecondsFloat())
break;
Sleep(TDuration::MilliSeconds(10));
}
// Always shutdown the completion queue after the server.
for (auto& cq : CQS_) {
cq->Shutdown();
}
for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) {
(*ti)->Join();
}
Ts.clear();
if (Options_.ExternalListener) {
Options_.ExternalListener->Stop();
}
}
ui16 TGRpcServer::GetPort() const {
return Options_.Port;
}
TString TGRpcServer::GetHost() const {
return Options_.Host;
}
} // namespace NGrpc