aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/server/grpc_request.h
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:16 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:16 +0300
commit17e20fa084178ddcb16255f974dbde74fb93608b (patch)
tree39605336c0b4d33928df69a256102c515fdf6ff5 /library/cpp/grpc/server/grpc_request.h
parent97df5ca7413550bf233fc6c7210e292fca0a51af (diff)
downloadydb-17e20fa084178ddcb16255f974dbde74fb93608b.tar.gz
Restoring authorship annotation for Daniil Cherednik <dcherednik@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/grpc/server/grpc_request.h')
-rw-r--r--library/cpp/grpc/server/grpc_request.h696
1 files changed, 348 insertions, 348 deletions
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h
index 5bd8d3902b..2841069d62 100644
--- a/library/cpp/grpc/server/grpc_request.h
+++ b/library/cpp/grpc/server/grpc_request.h
@@ -1,118 +1,118 @@
-#pragma once
-
+#pragma once
+
#include <google/protobuf/text_format.h>
#include <google/protobuf/arena.h>
#include <google/protobuf/message.h>
-
+
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/logger/priority.h>
-
+
#include "grpc_response.h"
-#include "event_callback.h"
+#include "event_callback.h"
#include "grpc_async_ctx_base.h"
#include "grpc_counters.h"
-#include "grpc_request_base.h"
-#include "grpc_server.h"
+#include "grpc_request_base.h"
+#include "grpc_server.h"
#include "logger.h"
-
+
#include <util/system/hp_timer.h>
-#include <grpc++/server.h>
-#include <grpc++/server_context.h>
+#include <grpc++/server.h>
+#include <grpc++/server_context.h>
#include <grpc++/support/async_stream.h>
#include <grpc++/support/async_unary_call.h>
-#include <grpc++/support/byte_buffer.h>
-#include <grpc++/impl/codegen/async_stream.h>
-
+#include <grpc++/support/byte_buffer.h>
+#include <grpc++/impl/codegen/async_stream.h>
+
namespace NGrpc {
-
-class IStreamAdaptor {
-public:
- using TPtr = std::unique_ptr<IStreamAdaptor>;
- virtual void Enqueue(std::function<void()>&& fn, bool urgent) = 0;
- virtual size_t ProcessNext() = 0;
- virtual ~IStreamAdaptor() = default;
-};
-
-IStreamAdaptor::TPtr CreateStreamAdaptor();
-
-///////////////////////////////////////////////////////////////////////////////
+
+class IStreamAdaptor {
+public:
+ using TPtr = std::unique_ptr<IStreamAdaptor>;
+ virtual void Enqueue(std::function<void()>&& fn, bool urgent) = 0;
+ virtual size_t ProcessNext() = 0;
+ virtual ~IStreamAdaptor() = default;
+};
+
+IStreamAdaptor::TPtr CreateStreamAdaptor();
+
+///////////////////////////////////////////////////////////////////////////////
template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter, typename TOutProtoPrinter>
class TGRpcRequestImpl
- : public TBaseAsyncContext<TService>
- , public IQueueEvent
- , public IRequestContextBase
-{
+ : public TBaseAsyncContext<TService>
+ , public IQueueEvent
+ , public IRequestContextBase
+{
using TThis = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>;
-public:
- using TOnRequest = std::function<void (IRequestContextBase* ctx)>;
- using TRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*,
- grpc::ServerAsyncResponseWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*);
- using TStreamRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*,
- grpc::ServerAsyncWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*);
-
+public:
+ using TOnRequest = std::function<void (IRequestContextBase* ctx)>;
+ using TRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*,
+ grpc::ServerAsyncResponseWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*);
+ using TStreamRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*,
+ grpc::ServerAsyncWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*);
+
TGRpcRequestImpl(TService* server,
- typename TService::TCurrentGRpcService::AsyncService* service,
- grpc::ServerCompletionQueue* cq,
- TOnRequest cb,
- TRequestCallback requestCallback,
+ typename TService::TCurrentGRpcService::AsyncService* service,
+ grpc::ServerCompletionQueue* cq,
+ TOnRequest cb,
+ TRequestCallback requestCallback,
const char* name,
TLoggerPtr logger,
ICounterBlockPtr counters,
IGRpcRequestLimiterPtr limiter)
- : TBaseAsyncContext<TService>(service, cq)
- , Server_(server)
- , Cb_(cb)
- , RequestCallback_(requestCallback)
- , StreamRequestCallback_(nullptr)
- , Name_(name)
+ : TBaseAsyncContext<TService>(service, cq)
+ , Server_(server)
+ , Cb_(cb)
+ , RequestCallback_(requestCallback)
+ , StreamRequestCallback_(nullptr)
+ , Name_(name)
, Logger_(std::move(logger))
- , Counters_(std::move(counters))
+ , Counters_(std::move(counters))
, RequestLimiter_(std::move(limiter))
, Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context))
, StateFunc_(&TThis::SetRequestDone)
- {
- AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
- Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);
- Y_VERIFY(Request_);
+ {
+ AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
+ Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);
+ Y_VERIFY(Request_);
GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_);
- FinishPromise_ = NThreading::NewPromise<EFinishStatus>();
- }
-
+ FinishPromise_ = NThreading::NewPromise<EFinishStatus>();
+ }
+
TGRpcRequestImpl(TService* server,
- typename TService::TCurrentGRpcService::AsyncService* service,
- grpc::ServerCompletionQueue* cq,
- TOnRequest cb,
- TStreamRequestCallback requestCallback,
+ typename TService::TCurrentGRpcService::AsyncService* service,
+ grpc::ServerCompletionQueue* cq,
+ TOnRequest cb,
+ TStreamRequestCallback requestCallback,
const char* name,
TLoggerPtr logger,
ICounterBlockPtr counters,
IGRpcRequestLimiterPtr limiter)
- : TBaseAsyncContext<TService>(service, cq)
- , Server_(server)
- , Cb_(cb)
- , RequestCallback_(nullptr)
- , StreamRequestCallback_(requestCallback)
- , Name_(name)
+ : TBaseAsyncContext<TService>(service, cq)
+ , Server_(server)
+ , Cb_(cb)
+ , RequestCallback_(nullptr)
+ , StreamRequestCallback_(requestCallback)
+ , Name_(name)
, Logger_(std::move(logger))
- , Counters_(std::move(counters))
+ , Counters_(std::move(counters))
, RequestLimiter_(std::move(limiter))
- , StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context))
+ , StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context))
, StateFunc_(&TThis::SetRequestDone)
- {
- AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
- Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);
- Y_VERIFY(Request_);
+ {
+ AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
+ Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);
+ Y_VERIFY(Request_);
GRPC_LOG_DEBUG(Logger_, "[%p] created streaming request Name# %s", this, Name_);
- FinishPromise_ = NThreading::NewPromise<EFinishStatus>();
- StreamAdaptor_ = CreateStreamAdaptor();
- }
-
- TAsyncFinishResult GetFinishFuture() override {
- return FinishPromise_.GetFuture();
- }
-
+ FinishPromise_ = NThreading::NewPromise<EFinishStatus>();
+ StreamAdaptor_ = CreateStreamAdaptor();
+ }
+
+ TAsyncFinishResult GetFinishFuture() override {
+ return FinishPromise_.GetFuture();
+ }
+
TString GetPeer() const override {
return TString(this->Context.peer());
}
@@ -121,7 +121,7 @@ public:
return Server_->SslServer();
}
- void Run() {
+ void Run() {
// Start request unless server is shutting down
if (auto guard = Server_->ProtectShutdown()) {
Ref(); //For grpc c runtime
@@ -135,28 +135,28 @@ public:
(&this->Context, Request_,
reinterpret_cast<grpc::ServerAsyncWriter<TOut>*>(StreamWriter_.Get()), this->CQ, this->CQ, GetGRpcTag());
}
- }
- }
-
+ }
+ }
+
~TGRpcRequestImpl() {
- // No direct dtor call allowed
- Y_ASSERT(RefCount() == 0);
- }
-
- bool Execute(bool ok) override {
- return (this->*StateFunc_)(ok);
- }
-
- void DestroyRequest() override {
+ // No direct dtor call allowed
+ Y_ASSERT(RefCount() == 0);
+ }
+
+ bool Execute(bool ok) override {
+ return (this->*StateFunc_)(ok);
+ }
+
+ void DestroyRequest() override {
if (RequestRegistered_) {
Server_->DeregisterRequestCtx(this);
RequestRegistered_ = false;
}
- UnRef();
- }
-
- TInstant Deadline() const override {
- return TBaseAsyncContext<TService>::Deadline();
+ UnRef();
+ }
+
+ TInstant Deadline() const override {
+ return TBaseAsyncContext<TService>::Deadline();
}
TSet<TStringBuf> GetPeerMetaKeys() const override {
@@ -164,299 +164,299 @@ public:
}
TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const override {
- return TBaseAsyncContext<TService>::GetPeerMetaValues(key);
- }
-
+ return TBaseAsyncContext<TService>::GetPeerMetaValues(key);
+ }
+
grpc_compression_level GetCompressionLevel() const override {
return TBaseAsyncContext<TService>::GetCompressionLevel();
}
- //! Get pointer to the request's message.
- const NProtoBuf::Message* GetRequest() const override {
- return Request_;
- }
-
- TAuthState& GetAuthState() override {
- return AuthState_;
- }
-
+ //! Get pointer to the request's message.
+ const NProtoBuf::Message* GetRequest() const override {
+ return Request_;
+ }
+
+ TAuthState& GetAuthState() override {
+ return AuthState_;
+ }
+
void Reply(NProtoBuf::Message* resp, ui32 status) override {
ResponseStatus = status;
- WriteDataOk(resp);
- }
-
+ WriteDataOk(resp);
+ }
+
void Reply(grpc::ByteBuffer* resp, ui32 status) override {
ResponseStatus = status;
- WriteByteDataOk(resp);
- }
-
+ WriteByteDataOk(resp);
+ }
+
void ReplyError(grpc::StatusCode code, const TString& msg) override {
- FinishGrpcStatus(code, msg, false);
- }
-
- void ReplyUnauthenticated(const TString& in) override {
- const TString message = in.empty() ? TString("unauthenticated") : TString("unauthenticated, ") + in;
- FinishGrpcStatus(grpc::StatusCode::UNAUTHENTICATED, message, false);
- }
-
- void SetNextReplyCallback(TOnNextReply&& cb) override {
- NextReplyCb_ = cb;
- }
-
+ FinishGrpcStatus(code, msg, false);
+ }
+
+ void ReplyUnauthenticated(const TString& in) override {
+ const TString message = in.empty() ? TString("unauthenticated") : TString("unauthenticated, ") + in;
+ FinishGrpcStatus(grpc::StatusCode::UNAUTHENTICATED, message, false);
+ }
+
+ void SetNextReplyCallback(TOnNextReply&& cb) override {
+ NextReplyCb_ = cb;
+ }
+
void AddTrailingMetadata(const TString& key, const TString& value) override {
- this->Context.AddTrailingMetadata(key, value);
- }
-
- void FinishStreamingOk() override {
- GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (enqueued)", this, Name_,
- this->Context.peer().c_str());
- auto cb = [this]() {
- StateFunc_ = &TThis::SetFinishDone;
- GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (pushed to grpc)", this, Name_,
- this->Context.peer().c_str());
-
- StreamWriter_->Finish(grpc::Status::OK, GetGRpcTag());
- };
- StreamAdaptor_->Enqueue(std::move(cb), false);
- }
-
- google::protobuf::Arena* GetArena() override {
- return &Arena_;
- }
-
+ this->Context.AddTrailingMetadata(key, value);
+ }
+
+ void FinishStreamingOk() override {
+ GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (enqueued)", this, Name_,
+ this->Context.peer().c_str());
+ auto cb = [this]() {
+ StateFunc_ = &TThis::SetFinishDone;
+ GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (pushed to grpc)", this, Name_,
+ this->Context.peer().c_str());
+
+ StreamWriter_->Finish(grpc::Status::OK, GetGRpcTag());
+ };
+ StreamAdaptor_->Enqueue(std::move(cb), false);
+ }
+
+ google::protobuf::Arena* GetArena() override {
+ return &Arena_;
+ }
+
void UseDatabase(const TString& database) override {
Counters_->UseDatabase(database);
}
-private:
- void Clone() {
- if (!Server_->IsShuttingDown()) {
- if (RequestCallback_) {
+private:
+ void Clone() {
+ if (!Server_->IsShuttingDown()) {
+ if (RequestCallback_) {
MakeIntrusive<TThis>(
Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
- } else {
+ } else {
MakeIntrusive<TThis>(
Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
- }
- }
- }
-
- void WriteDataOk(NProtoBuf::Message* resp) {
- auto makeResponseString = [&] {
- TString x;
+ }
+ }
+ }
+
+ void WriteDataOk(NProtoBuf::Message* resp) {
+ auto makeResponseString = [&] {
+ TString x;
TOutProtoPrinter printer;
- printer.SetSingleLineMode(true);
- printer.PrintToString(*resp, &x);
- return x;
- };
-
- auto sz = (size_t)resp->ByteSize();
- if (Writer_) {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s", this, Name_,
- makeResponseString().data(), this->Context.peer().c_str());
+ printer.SetSingleLineMode(true);
+ printer.PrintToString(*resp, &x);
+ return x;
+ };
+
+ auto sz = (size_t)resp->ByteSize();
+ if (Writer_) {
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s", this, Name_,
+ makeResponseString().data(), this->Context.peer().c_str());
StateFunc_ = &TThis::SetFinishDone;
- ResponseSize = sz;
- Y_VERIFY(this->Context.c_call());
+ ResponseSize = sz;
+ Y_VERIFY(this->Context.c_call());
Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
- } else {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (enqueued)",
- this, Name_, makeResponseString().data(), this->Context.peer().c_str());
+ } else {
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (enqueued)",
+ this, Name_, makeResponseString().data(), this->Context.peer().c_str());
// because of std::function cannot hold move-only captured object
// we allocate shared object on heap to avoid message copy
auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
auto cb = [this, uResp = std::move(uResp), sz, &makeResponseString]() {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (pushed to grpc)",
- this, Name_, makeResponseString().data(), this->Context.peer().c_str());
- StateFunc_ = &TThis::NextReply;
- ResponseSize += sz;
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (pushed to grpc)",
+ this, Name_, makeResponseString().data(), this->Context.peer().c_str());
+ StateFunc_ = &TThis::NextReply;
+ ResponseSize += sz;
StreamWriter_->Write(*uResp, GetGRpcTag());
- };
- StreamAdaptor_->Enqueue(std::move(cb), false);
- }
- }
-
- void WriteByteDataOk(grpc::ByteBuffer* resp) {
- auto sz = resp->Length();
- if (Writer_) {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_,
- this->Context.peer().c_str());
+ };
+ StreamAdaptor_->Enqueue(std::move(cb), false);
+ }
+ }
+
+ void WriteByteDataOk(grpc::ByteBuffer* resp) {
+ auto sz = resp->Length();
+ if (Writer_) {
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_,
+ this->Context.peer().c_str());
StateFunc_ = &TThis::SetFinishDone;
- ResponseSize = sz;
+ ResponseSize = sz;
Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
- } else {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (enqueued)", this, Name_,
- this->Context.peer().c_str());
+ } else {
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (enqueued)", this, Name_,
+ this->Context.peer().c_str());
// because of std::function cannot hold move-only captured object
// we allocate shared object on heap to avoid buffer copy
auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
auto cb = [this, uResp = std::move(uResp), sz]() {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)",
- this, Name_, this->Context.peer().c_str());
- StateFunc_ = &TThis::NextReply;
- ResponseSize += sz;
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)",
+ this, Name_, this->Context.peer().c_str());
+ StateFunc_ = &TThis::NextReply;
+ ResponseSize += sz;
StreamWriter_->Write(*uResp, GetGRpcTag());
- };
- StreamAdaptor_->Enqueue(std::move(cb), false);
- }
- }
-
- void FinishGrpcStatus(grpc::StatusCode code, const TString& msg, bool urgent) {
- Y_VERIFY(code != grpc::OK);
- if (code == grpc::StatusCode::UNAUTHENTICATED) {
- Counters_->CountNotAuthenticated();
- } else if (code == grpc::StatusCode::RESOURCE_EXHAUSTED) {
- Counters_->CountResourceExhausted();
- }
-
- if (Writer_) {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)", this,
- Name_, msg.c_str(), this->Context.peer().c_str(), (int)code);
- StateFunc_ = &TThis::SetFinishError;
+ };
+ StreamAdaptor_->Enqueue(std::move(cb), false);
+ }
+ }
+
+ void FinishGrpcStatus(grpc::StatusCode code, const TString& msg, bool urgent) {
+ Y_VERIFY(code != grpc::OK);
+ if (code == grpc::StatusCode::UNAUTHENTICATED) {
+ Counters_->CountNotAuthenticated();
+ } else if (code == grpc::StatusCode::RESOURCE_EXHAUSTED) {
+ Counters_->CountResourceExhausted();
+ }
+
+ if (Writer_) {
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)", this,
+ Name_, msg.c_str(), this->Context.peer().c_str(), (int)code);
+ StateFunc_ = &TThis::SetFinishError;
TOut resp;
Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg), GetGRpcTag());
- } else {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)"
- " (enqueued)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code);
- auto cb = [this, code, msg]() {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)"
- " (pushed to grpc)", this, Name_, msg.c_str(),
- this->Context.peer().c_str(), (int)code);
- StateFunc_ = &TThis::SetFinishError;
- StreamWriter_->Finish(grpc::Status(code, msg), GetGRpcTag());
- };
- StreamAdaptor_->Enqueue(std::move(cb), urgent);
- }
- }
-
- bool SetRequestDone(bool ok) {
- auto makeRequestString = [&] {
- TString resp;
- if (ok) {
+ } else {
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)"
+ " (enqueued)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code);
+ auto cb = [this, code, msg]() {
+ GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)"
+ " (pushed to grpc)", this, Name_, msg.c_str(),
+ this->Context.peer().c_str(), (int)code);
+ StateFunc_ = &TThis::SetFinishError;
+ StreamWriter_->Finish(grpc::Status(code, msg), GetGRpcTag());
+ };
+ StreamAdaptor_->Enqueue(std::move(cb), urgent);
+ }
+ }
+
+ bool SetRequestDone(bool ok) {
+ auto makeRequestString = [&] {
+ TString resp;
+ if (ok) {
TInProtoPrinter printer;
- printer.SetSingleLineMode(true);
- printer.PrintToString(*Request_, &resp);
- } else {
- resp = "<not ok>";
- }
- return resp;
- };
+ printer.SetSingleLineMode(true);
+ printer.PrintToString(*Request_, &resp);
+ } else {
+ resp = "<not ok>";
+ }
+ return resp;
+ };
GRPC_LOG_DEBUG(Logger_, "[%p] received request Name# %s ok# %s data# %s peer# %s", this, Name_,
ok ? "true" : "false", makeRequestString().data(), this->Context.peer().c_str());
-
- if (this->Context.c_call() == nullptr) {
+
+ if (this->Context.c_call() == nullptr) {
Y_VERIFY(!ok);
- // One ref by OnFinishTag, grpc will not call this tag if no request received
- UnRef();
+ // One ref by OnFinishTag, grpc will not call this tag if no request received
+ UnRef();
} else if (!(RequestRegistered_ = Server_->RegisterRequestCtx(this))) {
// Request cannot be registered due to shutdown
// It's unsafe to continue, so drop this request without processing
GRPC_LOG_DEBUG(Logger_, "[%p] dropping request Name# %s due to shutdown", this, Name_);
this->Context.TryCancel();
return false;
- }
-
- Clone(); // TODO: Request pool?
- if (!ok) {
- Counters_->CountNotOkRequest();
- return false;
- }
-
+ }
+
+ Clone(); // TODO: Request pool?
+ if (!ok) {
+ Counters_->CountNotOkRequest();
+ return false;
+ }
+
if (IncRequest()) {
- // Adjust counters.
- RequestSize = Request_->ByteSize();
- Counters_->StartProcessing(RequestSize);
+ // Adjust counters.
+ RequestSize = Request_->ByteSize();
+ Counters_->StartProcessing(RequestSize);
RequestTimer.Reset();
-
- if (!SslServer()) {
- Counters_->CountRequestWithoutTls();
- }
-
- //TODO: Move this in to grpc_request_proxy
+
+ if (!SslServer()) {
+ Counters_->CountRequestWithoutTls();
+ }
+
+ //TODO: Move this in to grpc_request_proxy
auto maybeDatabase = GetPeerMetaValues(TStringBuf("x-ydb-database"));
- if (maybeDatabase.empty()) {
- Counters_->CountRequestsWithoutDatabase();
- }
+ if (maybeDatabase.empty()) {
+ Counters_->CountRequestsWithoutDatabase();
+ }
auto maybeToken = GetPeerMetaValues(TStringBuf("x-ydb-auth-ticket"));
- if (maybeToken.empty() || maybeToken[0].empty()) {
+ if (maybeToken.empty() || maybeToken[0].empty()) {
TString db{maybeDatabase ? maybeDatabase[0] : TStringBuf{}};
- Counters_->CountRequestsWithoutToken();
- GRPC_LOG_DEBUG(Logger_, "[%p] received request without user token "
- "Name# %s data# %s peer# %s database# %s", this, Name_,
- makeRequestString().data(), this->Context.peer().c_str(), db.c_str());
- }
-
- // Handle current request.
- Cb_(this);
- } else {
- //This request has not been counted
- SkipUpdateCountersOnError = true;
- FinishGrpcStatus(grpc::StatusCode::RESOURCE_EXHAUSTED, "no resource", true);
- }
- return true;
- }
-
- bool NextReply(bool ok) {
- auto logCb = [this, ok](int left) {
- GRPC_LOG_DEBUG(Logger_, "[%p] ready for next reply Name# %s ok# %s peer# %s left# %d", this, Name_,
- ok ? "true" : "false", this->Context.peer().c_str(), left);
- };
-
- if (!ok) {
- logCb(-1);
+ Counters_->CountRequestsWithoutToken();
+ GRPC_LOG_DEBUG(Logger_, "[%p] received request without user token "
+ "Name# %s data# %s peer# %s database# %s", this, Name_,
+ makeRequestString().data(), this->Context.peer().c_str(), db.c_str());
+ }
+
+ // Handle current request.
+ Cb_(this);
+ } else {
+ //This request has not been counted
+ SkipUpdateCountersOnError = true;
+ FinishGrpcStatus(grpc::StatusCode::RESOURCE_EXHAUSTED, "no resource", true);
+ }
+ return true;
+ }
+
+ bool NextReply(bool ok) {
+ auto logCb = [this, ok](int left) {
+ GRPC_LOG_DEBUG(Logger_, "[%p] ready for next reply Name# %s ok# %s peer# %s left# %d", this, Name_,
+ ok ? "true" : "false", this->Context.peer().c_str(), left);
+ };
+
+ if (!ok) {
+ logCb(-1);
DecRequest();
- Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus,
- TDuration::Seconds(RequestTimer.Passed()));
- return false;
- }
-
- Ref(); // To prevent destroy during this call in case of execution Finish
- size_t left = StreamAdaptor_->ProcessNext();
- logCb(left);
- if (NextReplyCb_) {
- NextReplyCb_(left);
- }
- // Now it is safe to destroy even if Finish was called
- UnRef();
- return true;
- }
-
- bool SetFinishDone(bool ok) {
+ Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus,
+ TDuration::Seconds(RequestTimer.Passed()));
+ return false;
+ }
+
+ Ref(); // To prevent destroy during this call in case of execution Finish
+ size_t left = StreamAdaptor_->ProcessNext();
+ logCb(left);
+ if (NextReplyCb_) {
+ NextReplyCb_(left);
+ }
+ // Now it is safe to destroy even if Finish was called
+ UnRef();
+ return true;
+ }
+
+ bool SetFinishDone(bool ok) {
GRPC_LOG_DEBUG(Logger_, "[%p] finished request Name# %s ok# %s peer# %s", this, Name_,
- ok ? "true" : "false", this->Context.peer().c_str());
- //PrintBackTrace();
+ ok ? "true" : "false", this->Context.peer().c_str());
+ //PrintBackTrace();
DecRequest();
Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus,
TDuration::Seconds(RequestTimer.Passed()));
- return false;
- }
-
- bool SetFinishError(bool ok) {
+ return false;
+ }
+
+ bool SetFinishError(bool ok) {
GRPC_LOG_DEBUG(Logger_, "[%p] finished request with error Name# %s ok# %s peer# %s", this, Name_,
- ok ? "true" : "false", this->Context.peer().c_str());
- if (!SkipUpdateCountersOnError) {
+ ok ? "true" : "false", this->Context.peer().c_str());
+ if (!SkipUpdateCountersOnError) {
DecRequest();
Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus,
TDuration::Seconds(RequestTimer.Passed()));
- }
- return false;
- }
-
- // Returns pointer to IQueueEvent to pass into grpc c runtime
- // Implicit C style cast from this to void* is wrong due to multiple inheritance
- void* GetGRpcTag() {
- return static_cast<IQueueEvent*>(this);
- }
-
- void OnFinish(EQueueEventStatus evStatus) {
- if (this->Context.IsCancelled()) {
- FinishPromise_.SetValue(EFinishStatus::CANCEL);
- } else {
- FinishPromise_.SetValue(evStatus == EQueueEventStatus::OK ? EFinishStatus::OK : EFinishStatus::ERROR);
- }
- }
-
+ }
+ return false;
+ }
+
+ // Returns pointer to IQueueEvent to pass into grpc c runtime
+ // Implicit C style cast from this to void* is wrong due to multiple inheritance
+ void* GetGRpcTag() {
+ return static_cast<IQueueEvent*>(this);
+ }
+
+ void OnFinish(EQueueEventStatus evStatus) {
+ if (this->Context.IsCancelled()) {
+ FinishPromise_.SetValue(EFinishStatus::CANCEL);
+ } else {
+ FinishPromise_.SetValue(evStatus == EQueueEventStatus::OK ? EFinishStatus::OK : EFinishStatus::ERROR);
+ }
+ }
+
bool IncRequest() {
if (!Server_->IncRequest())
return false;
@@ -480,36 +480,36 @@ private:
}
using TStateFunc = bool (TThis::*)(bool);
- TService* Server_;
- TOnRequest Cb_;
- TRequestCallback RequestCallback_;
- TStreamRequestCallback StreamRequestCallback_;
+ TService* Server_;
+ TOnRequest Cb_;
+ TRequestCallback RequestCallback_;
+ TStreamRequestCallback StreamRequestCallback_;
const char* const Name_;
TLoggerPtr Logger_;
ICounterBlockPtr Counters_;
IGRpcRequestLimiterPtr RequestLimiter_;
-
+
THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_;
- THolder<grpc::ServerAsyncWriterInterface<TUniversalResponse<TOut>>> StreamWriter_;
- TStateFunc StateFunc_;
- TIn* Request_;
-
- google::protobuf::Arena Arena_;
- TOnNextReply NextReplyCb_;
- ui32 RequestSize = 0;
- ui32 ResponseSize = 0;
+ THolder<grpc::ServerAsyncWriterInterface<TUniversalResponse<TOut>>> StreamWriter_;
+ TStateFunc StateFunc_;
+ TIn* Request_;
+
+ google::protobuf::Arena Arena_;
+ TOnNextReply NextReplyCb_;
+ ui32 RequestSize = 0;
+ ui32 ResponseSize = 0;
ui32 ResponseStatus = 0;
THPTimer RequestTimer;
- TAuthState AuthState_ = 0;
+ TAuthState AuthState_ = 0;
bool RequestRegistered_ = false;
-
+
using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>;
TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish };
- NThreading::TPromise<EFinishStatus> FinishPromise_;
- bool SkipUpdateCountersOnError = false;
- IStreamAdaptor::TPtr StreamAdaptor_;
-};
-
+ NThreading::TPromise<EFinishStatus> FinishPromise_;
+ bool SkipUpdateCountersOnError = false;
+ IStreamAdaptor::TPtr StreamAdaptor_;
+};
+
template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter=google::protobuf::TextFormat::Printer, typename TOutProtoPrinter=google::protobuf::TextFormat::Printer>
class TGRpcRequest: public TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter> {
using TBase = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>;