aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/server/grpc_request.h
diff options
context:
space:
mode:
authormsherbakov <msherbakov@yandex-team.ru>2022-02-10 16:49:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:49:17 +0300
commita0ffafe83b7d6229709a32fa942c71d672ac989c (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc/server/grpc_request.h
parentc224a621661ddd69699f9476922eb316607ef57e (diff)
downloadydb-a0ffafe83b7d6229709a32fa942c71d672ac989c.tar.gz
Restoring authorship annotation for <msherbakov@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/server/grpc_request.h')
-rw-r--r--library/cpp/grpc/server/grpc_request.h96
1 files changed, 48 insertions, 48 deletions
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h
index c88d941815..5bd8d3902b 100644
--- a/library/cpp/grpc/server/grpc_request.h
+++ b/library/cpp/grpc/server/grpc_request.h
@@ -38,13 +38,13 @@ IStreamAdaptor::TPtr CreateStreamAdaptor();
///////////////////////////////////////////////////////////////////////////////
template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter, typename TOutProtoPrinter>
-class TGRpcRequestImpl
+class TGRpcRequestImpl
: public TBaseAsyncContext<TService>
, public IQueueEvent
, public IRequestContextBase
{
using TThis = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>;
-
+
public:
using TOnRequest = std::function<void (IRequestContextBase* ctx)>;
using TRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*,
@@ -52,13 +52,13 @@ public:
using TStreamRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*,
grpc::ServerAsyncWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*);
- TGRpcRequestImpl(TService* server,
+ TGRpcRequestImpl(TService* server,
typename TService::TCurrentGRpcService::AsyncService* service,
grpc::ServerCompletionQueue* cq,
TOnRequest cb,
TRequestCallback requestCallback,
const char* name,
- TLoggerPtr logger,
+ TLoggerPtr logger,
ICounterBlockPtr counters,
IGRpcRequestLimiterPtr limiter)
: TBaseAsyncContext<TService>(service, cq)
@@ -71,22 +71,22 @@ public:
, Counters_(std::move(counters))
, RequestLimiter_(std::move(limiter))
, Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context))
- , StateFunc_(&TThis::SetRequestDone)
+ , StateFunc_(&TThis::SetRequestDone)
{
AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);
Y_VERIFY(Request_);
- GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_);
+ GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_);
FinishPromise_ = NThreading::NewPromise<EFinishStatus>();
}
- TGRpcRequestImpl(TService* server,
+ TGRpcRequestImpl(TService* server,
typename TService::TCurrentGRpcService::AsyncService* service,
grpc::ServerCompletionQueue* cq,
TOnRequest cb,
TStreamRequestCallback requestCallback,
const char* name,
- TLoggerPtr logger,
+ TLoggerPtr logger,
ICounterBlockPtr counters,
IGRpcRequestLimiterPtr limiter)
: TBaseAsyncContext<TService>(service, cq)
@@ -99,12 +99,12 @@ public:
, Counters_(std::move(counters))
, RequestLimiter_(std::move(limiter))
, StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context))
- , StateFunc_(&TThis::SetRequestDone)
+ , StateFunc_(&TThis::SetRequestDone)
{
AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);
Y_VERIFY(Request_);
- GRPC_LOG_DEBUG(Logger_, "[%p] created streaming request Name# %s", this, Name_);
+ GRPC_LOG_DEBUG(Logger_, "[%p] created streaming request Name# %s", this, Name_);
FinishPromise_ = NThreading::NewPromise<EFinishStatus>();
StreamAdaptor_ = CreateStreamAdaptor();
}
@@ -138,7 +138,7 @@ public:
}
}
- ~TGRpcRequestImpl() {
+ ~TGRpcRequestImpl() {
// No direct dtor call allowed
Y_ASSERT(RefCount() == 0);
}
@@ -190,10 +190,10 @@ public:
WriteByteDataOk(resp);
}
- void ReplyError(grpc::StatusCode code, const TString& msg) override {
+ void ReplyError(grpc::StatusCode code, const TString& msg) override {
FinishGrpcStatus(code, msg, false);
- }
-
+ }
+
void ReplyUnauthenticated(const TString& in) override {
const TString message = in.empty() ? TString("unauthenticated") : TString("unauthenticated, ") + in;
FinishGrpcStatus(grpc::StatusCode::UNAUTHENTICATED, message, false);
@@ -232,10 +232,10 @@ private:
void Clone() {
if (!Server_->IsShuttingDown()) {
if (RequestCallback_) {
- MakeIntrusive<TThis>(
+ MakeIntrusive<TThis>(
Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
} else {
- MakeIntrusive<TThis>(
+ MakeIntrusive<TThis>(
Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
}
}
@@ -254,7 +254,7 @@ private:
if (Writer_) {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s", this, Name_,
makeResponseString().data(), this->Context.peer().c_str());
- StateFunc_ = &TThis::SetFinishDone;
+ StateFunc_ = &TThis::SetFinishDone;
ResponseSize = sz;
Y_VERIFY(this->Context.c_call());
Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
@@ -281,7 +281,7 @@ private:
if (Writer_) {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_,
this->Context.peer().c_str());
- StateFunc_ = &TThis::SetFinishDone;
+ StateFunc_ = &TThis::SetFinishDone;
ResponseSize = sz;
Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
} else {
@@ -342,7 +342,7 @@ private:
}
return resp;
};
- GRPC_LOG_DEBUG(Logger_, "[%p] received request Name# %s ok# %s data# %s peer# %s", this, Name_,
+ GRPC_LOG_DEBUG(Logger_, "[%p] received request Name# %s ok# %s data# %s peer# %s", this, Name_,
ok ? "true" : "false", makeRequestString().data(), this->Context.peer().c_str());
if (this->Context.c_call() == nullptr) {
@@ -352,7 +352,7 @@ private:
} else if (!(RequestRegistered_ = Server_->RegisterRequestCtx(this))) {
// Request cannot be registered due to shutdown
// It's unsafe to continue, so drop this request without processing
- GRPC_LOG_DEBUG(Logger_, "[%p] dropping request Name# %s due to shutdown", this, Name_);
+ GRPC_LOG_DEBUG(Logger_, "[%p] dropping request Name# %s due to shutdown", this, Name_);
this->Context.TryCancel();
return false;
}
@@ -423,7 +423,7 @@ private:
}
bool SetFinishDone(bool ok) {
- GRPC_LOG_DEBUG(Logger_, "[%p] finished request Name# %s ok# %s peer# %s", this, Name_,
+ GRPC_LOG_DEBUG(Logger_, "[%p] finished request Name# %s ok# %s peer# %s", this, Name_,
ok ? "true" : "false", this->Context.peer().c_str());
//PrintBackTrace();
DecRequest();
@@ -433,7 +433,7 @@ private:
}
bool SetFinishError(bool ok) {
- GRPC_LOG_DEBUG(Logger_, "[%p] finished request with error Name# %s ok# %s peer# %s", this, Name_,
+ GRPC_LOG_DEBUG(Logger_, "[%p] finished request with error Name# %s ok# %s peer# %s", this, Name_,
ok ? "true" : "false", this->Context.peer().c_str());
if (!SkipUpdateCountersOnError) {
DecRequest();
@@ -479,14 +479,14 @@ private:
Server_->DecRequest();
}
- using TStateFunc = bool (TThis::*)(bool);
+ using TStateFunc = bool (TThis::*)(bool);
TService* Server_;
TOnRequest Cb_;
TRequestCallback RequestCallback_;
TStreamRequestCallback StreamRequestCallback_;
const char* const Name_;
- TLoggerPtr Logger_;
- ICounterBlockPtr Counters_;
+ TLoggerPtr Logger_;
+ ICounterBlockPtr Counters_;
IGRpcRequestLimiterPtr RequestLimiter_;
THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_;
@@ -503,8 +503,8 @@ private:
TAuthState AuthState_ = 0;
bool RequestRegistered_ = false;
- using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>;
- TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish };
+ using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>;
+ TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish };
NThreading::TPromise<EFinishStatus> FinishPromise_;
bool SkipUpdateCountersOnError = false;
IStreamAdaptor::TPtr StreamAdaptor_;
@@ -513,31 +513,31 @@ private:
template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter=google::protobuf::TextFormat::Printer, typename TOutProtoPrinter=google::protobuf::TextFormat::Printer>
class TGRpcRequest: public TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter> {
using TBase = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>;
-public:
- TGRpcRequest(TService* server,
- typename TService::TCurrentGRpcService::AsyncService* service,
- grpc::ServerCompletionQueue* cq,
- typename TBase::TOnRequest cb,
- typename TBase::TRequestCallback requestCallback,
- const char* name,
+public:
+ TGRpcRequest(TService* server,
+ typename TService::TCurrentGRpcService::AsyncService* service,
+ grpc::ServerCompletionQueue* cq,
+ typename TBase::TOnRequest cb,
+ typename TBase::TRequestCallback requestCallback,
+ const char* name,
TLoggerPtr logger,
ICounterBlockPtr counters,
IGRpcRequestLimiterPtr limiter = nullptr)
: TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)}
- {
- }
-
- TGRpcRequest(TService* server,
- typename TService::TCurrentGRpcService::AsyncService* service,
- grpc::ServerCompletionQueue* cq,
- typename TBase::TOnRequest cb,
- typename TBase::TStreamRequestCallback requestCallback,
- const char* name,
+ {
+ }
+
+ TGRpcRequest(TService* server,
+ typename TService::TCurrentGRpcService::AsyncService* service,
+ grpc::ServerCompletionQueue* cq,
+ typename TBase::TOnRequest cb,
+ typename TBase::TStreamRequestCallback requestCallback,
+ const char* name,
TLoggerPtr logger,
- ICounterBlockPtr counters)
+ ICounterBlockPtr counters)
: TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), nullptr}
- {
- }
-};
-
+ {
+ }
+};
+
} // namespace NGrpc