aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/server/grpc_request.h
diff options
context:
space:
mode:
authorSergey Polovko <sergey@polovko.me>2022-02-10 16:47:03 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:03 +0300
commit2e714b5ebd40a1f4cc31c27f1ad6e49ca6d895f5 (patch)
treeb83306b6e37edeea782e9eed673d89286c4fef35 /library/cpp/grpc/server/grpc_request.h
parent3e0b762a82514bac89c1dd6ea7211e381d8aa248 (diff)
downloadydb-2e714b5ebd40a1f4cc31c27f1ad6e49ca6d895f5.tar.gz
Restoring authorship annotation for Sergey Polovko <sergey@polovko.me>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/server/grpc_request.h')
-rw-r--r--library/cpp/grpc/server/grpc_request.h92
1 files changed, 46 insertions, 46 deletions
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h
index dd9041eec7..5bd8d3902b 100644
--- a/library/cpp/grpc/server/grpc_request.h
+++ b/library/cpp/grpc/server/grpc_request.h
@@ -4,19 +4,19 @@
#include <google/protobuf/arena.h>
#include <google/protobuf/message.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/logger/priority.h>
-#include "grpc_response.h"
+#include "grpc_response.h"
#include "event_callback.h"
-#include "grpc_async_ctx_base.h"
-#include "grpc_counters.h"
+#include "grpc_async_ctx_base.h"
+#include "grpc_counters.h"
#include "grpc_request_base.h"
#include "grpc_server.h"
-#include "logger.h"
+#include "logger.h"
+
+#include <util/system/hp_timer.h>
-#include <util/system/hp_timer.h>
-
#include <grpc++/server.h>
#include <grpc++/server_context.h>
#include <grpc++/support/async_stream.h>
@@ -24,7 +24,7 @@
#include <grpc++/support/byte_buffer.h>
#include <grpc++/impl/codegen/async_stream.h>
-namespace NGrpc {
+namespace NGrpc {
class IStreamAdaptor {
public:
@@ -57,7 +57,7 @@ public:
grpc::ServerCompletionQueue* cq,
TOnRequest cb,
TRequestCallback requestCallback,
- const char* name,
+ const char* name,
TLoggerPtr logger,
ICounterBlockPtr counters,
IGRpcRequestLimiterPtr limiter)
@@ -67,10 +67,10 @@ public:
, RequestCallback_(requestCallback)
, StreamRequestCallback_(nullptr)
, Name_(name)
- , Logger_(std::move(logger))
+ , Logger_(std::move(logger))
, Counters_(std::move(counters))
, RequestLimiter_(std::move(limiter))
- , Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context))
+ , Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context))
, StateFunc_(&TThis::SetRequestDone)
{
AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
@@ -85,7 +85,7 @@ public:
grpc::ServerCompletionQueue* cq,
TOnRequest cb,
TStreamRequestCallback requestCallback,
- const char* name,
+ const char* name,
TLoggerPtr logger,
ICounterBlockPtr counters,
IGRpcRequestLimiterPtr limiter)
@@ -95,7 +95,7 @@ public:
, RequestCallback_(nullptr)
, StreamRequestCallback_(requestCallback)
, Name_(name)
- , Logger_(std::move(logger))
+ , Logger_(std::move(logger))
, Counters_(std::move(counters))
, RequestLimiter_(std::move(limiter))
, StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context))
@@ -157,13 +157,13 @@ public:
TInstant Deadline() const override {
return TBaseAsyncContext<TService>::Deadline();
- }
-
- TSet<TStringBuf> GetPeerMetaKeys() const override {
- return TBaseAsyncContext<TService>::GetPeerMetaKeys();
- }
-
- TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const override {
+ }
+
+ TSet<TStringBuf> GetPeerMetaKeys() const override {
+ return TBaseAsyncContext<TService>::GetPeerMetaKeys();
+ }
+
+ TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const override {
return TBaseAsyncContext<TService>::GetPeerMetaValues(key);
}
@@ -233,10 +233,10 @@ private:
if (!Server_->IsShuttingDown()) {
if (RequestCallback_) {
MakeIntrusive<TThis>(
- Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
+ Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
} else {
MakeIntrusive<TThis>(
- Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
+ Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
}
}
}
@@ -257,20 +257,20 @@ private:
StateFunc_ = &TThis::SetFinishDone;
ResponseSize = sz;
Y_VERIFY(this->Context.c_call());
- Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
+ Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
} else {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (enqueued)",
this, Name_, makeResponseString().data(), this->Context.peer().c_str());
-
- // because of std::function cannot hold move-only captured object
- // we allocate shared object on heap to avoid message copy
- auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
- auto cb = [this, uResp = std::move(uResp), sz, &makeResponseString]() {
+
+ // because of std::function cannot hold move-only captured object
+ // we allocate shared object on heap to avoid message copy
+ auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
+ auto cb = [this, uResp = std::move(uResp), sz, &makeResponseString]() {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (pushed to grpc)",
this, Name_, makeResponseString().data(), this->Context.peer().c_str());
StateFunc_ = &TThis::NextReply;
ResponseSize += sz;
- StreamWriter_->Write(*uResp, GetGRpcTag());
+ StreamWriter_->Write(*uResp, GetGRpcTag());
};
StreamAdaptor_->Enqueue(std::move(cb), false);
}
@@ -283,20 +283,20 @@ private:
this->Context.peer().c_str());
StateFunc_ = &TThis::SetFinishDone;
ResponseSize = sz;
- Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
+ Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
} else {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (enqueued)", this, Name_,
this->Context.peer().c_str());
-
- // because of std::function cannot hold move-only captured object
- // we allocate shared object on heap to avoid buffer copy
- auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
- auto cb = [this, uResp = std::move(uResp), sz]() {
+
+ // because of std::function cannot hold move-only captured object
+ // we allocate shared object on heap to avoid buffer copy
+ auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
+ auto cb = [this, uResp = std::move(uResp), sz]() {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)",
this, Name_, this->Context.peer().c_str());
StateFunc_ = &TThis::NextReply;
ResponseSize += sz;
- StreamWriter_->Write(*uResp, GetGRpcTag());
+ StreamWriter_->Write(*uResp, GetGRpcTag());
};
StreamAdaptor_->Enqueue(std::move(cb), false);
}
@@ -314,8 +314,8 @@ private:
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)", this,
Name_, msg.c_str(), this->Context.peer().c_str(), (int)code);
StateFunc_ = &TThis::SetFinishError;
- TOut resp;
- Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg), GetGRpcTag());
+ TOut resp;
+ Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg), GetGRpcTag());
} else {
GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)"
" (enqueued)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code);
@@ -380,7 +380,7 @@ private:
}
auto maybeToken = GetPeerMetaValues(TStringBuf("x-ydb-auth-ticket"));
if (maybeToken.empty() || maybeToken[0].empty()) {
- TString db{maybeDatabase ? maybeDatabase[0] : TStringBuf{}};
+ TString db{maybeDatabase ? maybeDatabase[0] : TStringBuf{}};
Counters_->CountRequestsWithoutToken();
GRPC_LOG_DEBUG(Logger_, "[%p] received request without user token "
"Name# %s data# %s peer# %s database# %s", this, Name_,
@@ -484,12 +484,12 @@ private:
TOnRequest Cb_;
TRequestCallback RequestCallback_;
TStreamRequestCallback StreamRequestCallback_;
- const char* const Name_;
+ const char* const Name_;
TLoggerPtr Logger_;
ICounterBlockPtr Counters_;
IGRpcRequestLimiterPtr RequestLimiter_;
- THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_;
+ THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_;
THolder<grpc::ServerAsyncWriterInterface<TUniversalResponse<TOut>>> StreamWriter_;
TStateFunc StateFunc_;
TIn* Request_;
@@ -520,10 +520,10 @@ public:
typename TBase::TOnRequest cb,
typename TBase::TRequestCallback requestCallback,
const char* name,
- TLoggerPtr logger,
+ TLoggerPtr logger,
ICounterBlockPtr counters,
IGRpcRequestLimiterPtr limiter = nullptr)
- : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)}
+ : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)}
{
}
@@ -533,11 +533,11 @@ public:
typename TBase::TOnRequest cb,
typename TBase::TStreamRequestCallback requestCallback,
const char* name,
- TLoggerPtr logger,
+ TLoggerPtr logger,
ICounterBlockPtr counters)
- : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), nullptr}
+ : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), nullptr}
{
}
};
-} // namespace NGrpc
+} // namespace NGrpc