aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc
diff options
context:
space:
mode:
authormsherbakov <msherbakov@yandex-team.ru>2022-02-10 16:49:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:49:17 +0300
commita0ffafe83b7d6229709a32fa942c71d672ac989c (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc
parentc224a621661ddd69699f9476922eb316607ef57e (diff)
downloadydb-a0ffafe83b7d6229709a32fa942c71d672ac989c.tar.gz
Restoring authorship annotation for <msherbakov@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc')
-rw-r--r--library/cpp/grpc/client/grpc_client_low.cpp4
-rw-r--r--library/cpp/grpc/client/grpc_client_low.h4
-rw-r--r--library/cpp/grpc/client/grpc_common.h28
-rw-r--r--library/cpp/grpc/server/grpc_counters.h38
-rw-r--r--library/cpp/grpc/server/grpc_request.h96
-rw-r--r--library/cpp/grpc/server/grpc_request_base.h8
-rw-r--r--library/cpp/grpc/ya.make6
7 files changed, 92 insertions, 92 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp
index b2d0ea6eb2..73cc908ef8 100644
--- a/library/cpp/grpc/client/grpc_client_low.cpp
+++ b/library/cpp/grpc/client/grpc_client_low.cpp
@@ -15,7 +15,7 @@
#include <netinet/tcp.h>
#endif
-namespace NGrpc {
+namespace NGrpc {
void EnableGRpcTracing() {
grpc_tracer_set_enabled("tcp", true);
@@ -583,4 +583,4 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) {
}
}
-} // namespace NGRpc
+} // namespace NGRpc
diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h
index e2a80e624e..ab0a0627be 100644
--- a/library/cpp/grpc/client/grpc_client_low.h
+++ b/library/cpp/grpc/client/grpc_client_low.h
@@ -21,7 +21,7 @@
* This file contains low level logic for grpc
* This file should not be used in high level code without special reason
*/
-namespace NGrpc {
+namespace NGrpc {
const size_t DEFAULT_NUM_THREADS = 2;
@@ -1396,4 +1396,4 @@ private:
std::mutex JoinMutex_;
};
-} // namespace NGRpc
+} // namespace NGRpc
diff --git a/library/cpp/grpc/client/grpc_common.h b/library/cpp/grpc/client/grpc_common.h
index 12a3f7c28e..ffcdafe045 100644
--- a/library/cpp/grpc/client/grpc_common.h
+++ b/library/cpp/grpc/client/grpc_common.h
@@ -1,7 +1,7 @@
#pragma once
#include <grpc++/grpc++.h>
-#include <grpc++/resource_quota.h>
+#include <grpc++/resource_quota.h>
#include <util/datetime/base.h>
#include <unordered_map>
@@ -9,19 +9,19 @@
constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000;
-namespace NGrpc {
+namespace NGrpc {
struct TGRpcClientConfig {
TString Locator; // format host:port
TDuration Timeout = TDuration::Max(); // request timeout
ui64 MaxMessageSize = DEFAULT_GRPC_MESSAGE_SIZE_LIMIT; // Max request and response size
- ui64 MaxInboundMessageSize = 0; // overrides MaxMessageSize for incoming requests
- ui64 MaxOutboundMessageSize = 0; // overrides MaxMessageSize for outgoing requests
+ ui64 MaxInboundMessageSize = 0; // overrides MaxMessageSize for incoming requests
+ ui64 MaxOutboundMessageSize = 0; // overrides MaxMessageSize for outgoing requests
ui32 MaxInFlight = 0;
bool EnableSsl = false;
TString SslCaCert; //Implicitly enables Ssl if not empty
grpc_compression_algorithm CompressionAlgoritm = GRPC_COMPRESS_NONE;
- ui64 MemQuota = 0;
+ ui64 MemQuota = 0;
std::unordered_map<TString, TString> StringChannelParams;
std::unordered_map<TString, int> IntChannelParams;
TString LoadBalancingPolicy = { };
@@ -48,10 +48,10 @@ struct TGRpcClientConfig {
inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRpcClientConfig& config, grpc_socket_mutator* mutator = nullptr){
grpc::ChannelArguments args;
- args.SetMaxReceiveMessageSize(config.MaxInboundMessageSize ? config.MaxInboundMessageSize : config.MaxMessageSize);
- args.SetMaxSendMessageSize(config.MaxOutboundMessageSize ? config.MaxOutboundMessageSize : config.MaxMessageSize);
+ args.SetMaxReceiveMessageSize(config.MaxInboundMessageSize ? config.MaxInboundMessageSize : config.MaxMessageSize);
+ args.SetMaxSendMessageSize(config.MaxOutboundMessageSize ? config.MaxOutboundMessageSize : config.MaxMessageSize);
args.SetCompressionAlgorithm(config.CompressionAlgoritm);
-
+
for (const auto& kvp: config.StringChannelParams) {
args.SetString(kvp.first, kvp.second);
}
@@ -60,11 +60,11 @@ inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRp
args.SetInt(kvp.first, kvp.second);
}
- if (config.MemQuota) {
- grpc::ResourceQuota quota;
- quota.Resize(config.MemQuota);
- args.SetResourceQuota(quota);
- }
+ if (config.MemQuota) {
+ grpc::ResourceQuota quota;
+ quota.Resize(config.MemQuota);
+ args.SetResourceQuota(quota);
+ }
if (mutator) {
args.SetSocketMutator(mutator);
}
@@ -81,4 +81,4 @@ inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRp
}
}
-} // namespace NGRpc
+} // namespace NGRpc
diff --git a/library/cpp/grpc/server/grpc_counters.h b/library/cpp/grpc/server/grpc_counters.h
index 387163cfea..0b6c36c84c 100644
--- a/library/cpp/grpc/server/grpc_counters.h
+++ b/library/cpp/grpc/server/grpc_counters.h
@@ -6,14 +6,14 @@
namespace NGrpc {
-struct ICounterBlock : public TThrRefBase {
- virtual void CountNotOkRequest() = 0;
- virtual void CountNotOkResponse() = 0;
- virtual void CountNotAuthenticated() = 0;
- virtual void CountResourceExhausted() = 0;
- virtual void CountRequestBytes(ui32 requestSize) = 0;
- virtual void CountResponseBytes(ui32 responseSize) = 0;
- virtual void StartProcessing(ui32 requestSize) = 0;
+struct ICounterBlock : public TThrRefBase {
+ virtual void CountNotOkRequest() = 0;
+ virtual void CountNotOkResponse() = 0;
+ virtual void CountNotAuthenticated() = 0;
+ virtual void CountResourceExhausted() = 0;
+ virtual void CountRequestBytes(ui32 requestSize) = 0;
+ virtual void CountResponseBytes(ui32 responseSize) = 0;
+ virtual void StartProcessing(ui32 requestSize) = 0;
virtual void FinishProcessing(ui32 requestSize, ui32 responseSize, bool ok, ui32 status, TDuration requestDuration) = 0;
virtual void CountRequestsWithoutDatabase() {}
virtual void CountRequestsWithoutToken() {}
@@ -21,11 +21,11 @@ struct ICounterBlock : public TThrRefBase {
virtual TIntrusivePtr<ICounterBlock> Clone() { return this; }
virtual void UseDatabase(const TString& database) { Y_UNUSED(database); }
-};
-
+};
+
using ICounterBlockPtr = TIntrusivePtr<ICounterBlock>;
-class TCounterBlock final : public ICounterBlock {
+class TCounterBlock final : public ICounterBlock {
NMonitoring::TDynamicCounters::TCounterPtr TotalCounter;
NMonitoring::TDynamicCounters::TCounterPtr InflyCounter;
NMonitoring::TDynamicCounters::TCounterPtr NotOkRequestCounter;
@@ -36,7 +36,7 @@ class TCounterBlock final : public ICounterBlock {
NMonitoring::TDynamicCounters::TCounterPtr NotAuthenticated;
NMonitoring::TDynamicCounters::TCounterPtr ResourceExhausted;
bool Percentile = false;
- NMonitoring::TPercentileTracker<4, 512, 15> RequestHistMs;
+ NMonitoring::TPercentileTracker<4, 512, 15> RequestHistMs;
std::array<NMonitoring::TDynamicCounters::TCounterPtr, 2> GRpcStatusCounters;
public:
@@ -66,31 +66,31 @@ public:
}
}
- void CountNotOkRequest() override {
+ void CountNotOkRequest() override {
NotOkRequestCounter->Inc();
}
- void CountNotOkResponse() override {
+ void CountNotOkResponse() override {
NotOkResponseCounter->Inc();
}
- void CountNotAuthenticated() override {
+ void CountNotAuthenticated() override {
NotAuthenticated->Inc();
}
- void CountResourceExhausted() override {
+ void CountResourceExhausted() override {
ResourceExhausted->Inc();
}
- void CountRequestBytes(ui32 requestSize) override {
+ void CountRequestBytes(ui32 requestSize) override {
*RequestBytes += requestSize;
}
- void CountResponseBytes(ui32 responseSize) override {
+ void CountResponseBytes(ui32 responseSize) override {
*ResponseBytes += responseSize;
}
- void StartProcessing(ui32 requestSize) override {
+ void StartProcessing(ui32 requestSize) override {
TotalCounter->Inc();
InflyCounter->Inc();
*RequestBytes += requestSize;
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h
index c88d941815..5bd8d3902b 100644
--- a/library/cpp/grpc/server/grpc_request.h
+++ b/library/cpp/grpc/server/grpc_request.h
@@ -38,13 +38,13 @@ IStreamAdaptor::TPtr CreateStreamAdaptor();
///////////////////////////////////////////////////////////////////////////////
template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter, typename TOutProtoPrinter>
-class TGRpcRequestImpl
+class TGRpcRequestImpl
: public TBaseAsyncContext<TService>
, public IQueueEvent
, public IRequestContextBase
{
using TThis = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>;
-
+
public:
using TOnRequest = std::function<void (IRequestContextBase* ctx)>;
using TRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*,
@@ -52,13 +52,13 @@ public:
using TStreamRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*,
grpc::ServerAsyncWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*);
- TGRpcRequestImpl(TService* server,
+ TGRpcRequestImpl(TService* server,
typename TService::TCurrentGRpcService::AsyncService* service,
grpc::ServerCompletionQueue* cq,
TOnRequest cb,
TRequestCallback requestCallback,
const char* name,
- TLoggerPtr logger,
+ TLoggerPtr logger,
ICounterBlockPtr counters,
IGRpcRequestLimiterPtr limiter)
: TBaseAsyncContext<TService>(service, cq)
@@ -71,22 +71,22 @@ public:
, Counters_(std::move(counters))
, RequestLimiter_(std::move(limiter))
, Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context))
- , StateFunc_(&TThis::SetRequestDone)
+ , StateFunc_(&TThis::SetRequestDone)
{
AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);
Y_VERIFY(Request_);
- GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_);
+ GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_);
FinishPromise_ = NThreading::NewPromise<EFinishStatus>();
}
- TGRpcRequestImpl(TService* server,
+ TGRpcRequestImpl(TService* server,
typename TService::TCurrentGRpcService::AsyncService* service,
grpc::ServerCompletionQueue* cq,
TOnRequest cb,
TStreamRequestCallback requestCallback,
const char* name,
- TLoggerPtr logger,
+ TLoggerPtr logger,
ICounterBlockPtr counters,
IGRpcRequestLimiterPtr limiter)
: TBaseAsyncContext<TService>(service, cq)
@@ -99,12 +99,12 @@ public:
, Counters_(std::move(counters))
, RequestLimiter_(std::move(limiter))
, StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context))
- , StateFunc_(&TThis::SetRequestDone)
+ , StateFunc_(&TThis::SetRequestDone)
{
AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);
Y_VERIFY(Request_);
- GRPC_LOG_DEBUG(Logger_, "[%p] created streaming request Name# %s", this, Name_);
+ GRPC_LOG_DEBUG(Logger_, "[%p] created streaming request Name# %s", this, Name_);
FinishPromise_ = NThreading::NewPromise<EFinishStatus>();
StreamAdaptor_ = CreateStreamAdaptor();
}
@@ -138,7 +138,7 @@ public:
}
}
- ~TGRpcRequestImpl() {
+ ~TGRpcRequestImpl() {
// No direct dtor call allowed
Y_ASSERT(RefCount() == 0);
}
@@ -190,10 +190,10 @@ public:
WriteByteDataOk(resp);
}
- void ReplyError(grpc::StatusCode code, const TString& msg) override {
+ void ReplyError(grpc::StatusCode code, const TString& msg) override {
FinishGrpcStatus(code, msg, false);
- }
-
+ }
+
void ReplyUnauthenticated(const TString& in) override {
const TString message = in.empty() ? TString("unauthenticated") : TString("unauthenticated, ") + in;
FinishGrpcStatus(grpc::StatusCode::UNAUTHENTICATED, message, false);
@@ -232,10 +232,10 @@ private:
void Clone() {
if (!Server_->IsShuttingDown()) {
if (RequestCallback_) {
- MakeIntrusive<TThis>(
+ MakeIntrusive<TThis>(
Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
} else {
- MakeIntrusive<TThis>(
+ MakeIntrusive<TThis>(
Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
}
}
@@ -254,7 +254,7 @@ private:
if (Writer_) {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s", this, Name_,
makeResponseString().data(), this->Context.peer().c_str());
- StateFunc_ = &TThis::SetFinishDone;
+ StateFunc_ = &TThis::SetFinishDone;
ResponseSize = sz;
Y_VERIFY(this->Context.c_call());
Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
@@ -281,7 +281,7 @@ private:
if (Writer_) {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_,
this->Context.peer().c_str());
- StateFunc_ = &TThis::SetFinishDone;
+ StateFunc_ = &TThis::SetFinishDone;
ResponseSize = sz;
Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
} else {
@@ -342,7 +342,7 @@ private:
}
return resp;
};
- GRPC_LOG_DEBUG(Logger_, "[%p] received request Name# %s ok# %s data# %s peer# %s", this, Name_,
+ GRPC_LOG_DEBUG(Logger_, "[%p] received request Name# %s ok# %s data# %s peer# %s", this, Name_,
ok ? "true" : "false", makeRequestString().data(), this->Context.peer().c_str());
if (this->Context.c_call() == nullptr) {
@@ -352,7 +352,7 @@ private:
} else if (!(RequestRegistered_ = Server_->RegisterRequestCtx(this))) {
// Request cannot be registered due to shutdown
// It's unsafe to continue, so drop this request without processing
- GRPC_LOG_DEBUG(Logger_, "[%p] dropping request Name# %s due to shutdown", this, Name_);
+ GRPC_LOG_DEBUG(Logger_, "[%p] dropping request Name# %s due to shutdown", this, Name_);
this->Context.TryCancel();
return false;
}
@@ -423,7 +423,7 @@ private:
}
bool SetFinishDone(bool ok) {
- GRPC_LOG_DEBUG(Logger_, "[%p] finished request Name# %s ok# %s peer# %s", this, Name_,
+ GRPC_LOG_DEBUG(Logger_, "[%p] finished request Name# %s ok# %s peer# %s", this, Name_,
ok ? "true" : "false", this->Context.peer().c_str());
//PrintBackTrace();
DecRequest();
@@ -433,7 +433,7 @@ private:
}
bool SetFinishError(bool ok) {
- GRPC_LOG_DEBUG(Logger_, "[%p] finished request with error Name# %s ok# %s peer# %s", this, Name_,
+ GRPC_LOG_DEBUG(Logger_, "[%p] finished request with error Name# %s ok# %s peer# %s", this, Name_,
ok ? "true" : "false", this->Context.peer().c_str());
if (!SkipUpdateCountersOnError) {
DecRequest();
@@ -479,14 +479,14 @@ private:
Server_->DecRequest();
}
- using TStateFunc = bool (TThis::*)(bool);
+ using TStateFunc = bool (TThis::*)(bool);
TService* Server_;
TOnRequest Cb_;
TRequestCallback RequestCallback_;
TStreamRequestCallback StreamRequestCallback_;
const char* const Name_;
- TLoggerPtr Logger_;
- ICounterBlockPtr Counters_;
+ TLoggerPtr Logger_;
+ ICounterBlockPtr Counters_;
IGRpcRequestLimiterPtr RequestLimiter_;
THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_;
@@ -503,8 +503,8 @@ private:
TAuthState AuthState_ = 0;
bool RequestRegistered_ = false;
- using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>;
- TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish };
+ using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>;
+ TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish };
NThreading::TPromise<EFinishStatus> FinishPromise_;
bool SkipUpdateCountersOnError = false;
IStreamAdaptor::TPtr StreamAdaptor_;
@@ -513,31 +513,31 @@ private:
template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter=google::protobuf::TextFormat::Printer, typename TOutProtoPrinter=google::protobuf::TextFormat::Printer>
class TGRpcRequest: public TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter> {
using TBase = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>;
-public:
- TGRpcRequest(TService* server,
- typename TService::TCurrentGRpcService::AsyncService* service,
- grpc::ServerCompletionQueue* cq,
- typename TBase::TOnRequest cb,
- typename TBase::TRequestCallback requestCallback,
- const char* name,
+public:
+ TGRpcRequest(TService* server,
+ typename TService::TCurrentGRpcService::AsyncService* service,
+ grpc::ServerCompletionQueue* cq,
+ typename TBase::TOnRequest cb,
+ typename TBase::TRequestCallback requestCallback,
+ const char* name,
TLoggerPtr logger,
ICounterBlockPtr counters,
IGRpcRequestLimiterPtr limiter = nullptr)
: TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)}
- {
- }
-
- TGRpcRequest(TService* server,
- typename TService::TCurrentGRpcService::AsyncService* service,
- grpc::ServerCompletionQueue* cq,
- typename TBase::TOnRequest cb,
- typename TBase::TStreamRequestCallback requestCallback,
- const char* name,
+ {
+ }
+
+ TGRpcRequest(TService* server,
+ typename TService::TCurrentGRpcService::AsyncService* service,
+ grpc::ServerCompletionQueue* cq,
+ typename TBase::TOnRequest cb,
+ typename TBase::TStreamRequestCallback requestCallback,
+ const char* name,
TLoggerPtr logger,
- ICounterBlockPtr counters)
+ ICounterBlockPtr counters)
: TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), nullptr}
- {
- }
-};
-
+ {
+ }
+};
+
} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h
index acf3871569..fcfce1c181 100644
--- a/library/cpp/grpc/server/grpc_request_base.h
+++ b/library/cpp/grpc/server/grpc_request_base.h
@@ -60,16 +60,16 @@ public:
//! Implementation can swap protobuf message
virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0) = 0;
- //! Send serialised response (The request shoult be created for bytes response type)
+ //! Send serialised response (The request shoult be created for bytes response type)
//! Implementation can swap ByteBuffer
virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0) = 0;
//! Send grpc UNAUTHENTICATED status
virtual void ReplyUnauthenticated(const TString& in) = 0;
- //! Send grpc error
- virtual void ReplyError(grpc::StatusCode code, const TString& msg) = 0;
-
+ //! Send grpc error
+ virtual void ReplyError(grpc::StatusCode code, const TString& msg) = 0;
+
//! Returns deadline (server epoch related) if peer set it on its side, or Instanse::Max() otherwise
virtual TInstant Deadline() const = 0;
diff --git a/library/cpp/grpc/ya.make b/library/cpp/grpc/ya.make
index 9239947ad1..3635124115 100644
--- a/library/cpp/grpc/ya.make
+++ b/library/cpp/grpc/ya.make
@@ -1,5 +1,5 @@
-RECURSE(
- client
+RECURSE(
+ client
common
server
-)
+)