aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@ydb.tech>2024-09-30 13:04:14 +0200
committerGitHub <noreply@github.com>2024-09-30 14:04:14 +0300
commitfbe6acaa936ce93f4a05735e08b5d22783c5c04b (patch)
treee3a86d4c656c3c5830229a7e555719fdc4c12d3b
parent5dc26211e73718ac1359b62bc5af18515c76f88e (diff)
downloadydb-fbe6acaa936ce93f4a05735e08b5d22783c5c04b.tar.gz
[refactoring] Get rid of "GetPeer()" grpc request implementation copy paste and unused functions (#9814)
-rw-r--r--ydb/core/client/server/grpc_server.cpp12
-rw-r--r--ydb/core/grpc_services/base/base.h12
-rw-r--r--ydb/core/grpc_services/local_grpc/local_grpc.h5
-rw-r--r--ydb/core/grpc_services/local_rpc/local_rpc.h8
-rw-r--r--ydb/core/grpc_services/rpc_deferrable.h4
-rw-r--r--ydb/core/grpc_services/ydb_over_fq/rpc_base.h4
-rw-r--r--ydb/core/grpc_streaming/grpc_streaming.h28
-rw-r--r--ydb/core/kafka_proxy/actors/control_plane_common.h4
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp4
-rw-r--r--ydb/core/public_http/grpc_request_context_wrapper.cpp4
-rw-r--r--ydb/core/public_http/grpc_request_context_wrapper.h1
-rw-r--r--ydb/library/grpc/server/grpc_async_ctx_base.h2
-rw-r--r--ydb/library/grpc/server/grpc_request.h16
-rw-r--r--ydb/library/grpc/server/grpc_request_base.h3
14 files changed, 25 insertions, 82 deletions
diff --git a/ydb/core/client/server/grpc_server.cpp b/ydb/core/client/server/grpc_server.cpp
index 02326b38d2b..c1e53a4ea2b 100644
--- a/ydb/core/client/server/grpc_server.cpp
+++ b/ydb/core/client/server/grpc_server.cpp
@@ -239,7 +239,7 @@ public:
}
TString GetPeer() const override {
- return GetPeerName();
+ return TGrpcBaseAsyncContext::GetPeer();
}
TVector<TStringBuf> FindClientCert() const override {
@@ -253,7 +253,7 @@ private:
void Finish(const TOut& resp, ui32 status) {
LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] issuing response Name# %s data# %s peer# %s", this,
- Name, NYdbGrpc::FormatMessage<TOut>(resp).data(), GetPeerName().c_str());
+ Name, NYdbGrpc::FormatMessage<TOut>(resp).data(), GetPeer().c_str());
ResponseSize = resp.ByteSize();
ResponseStatus = status;
StateFunc = &TSimpleRequest::FinishDone;
@@ -266,7 +266,7 @@ private:
TOut resp;
TString msg = "no resource";
LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] issuing response Name# %s nodata (no resources) peer# %s", this,
- Name, GetPeerName().c_str());
+ Name, GetPeer().c_str());
StateFunc = &TSimpleRequest::FinishDoneWithoutProcessing;
OnBeforeCall();
@@ -280,7 +280,7 @@ private:
OnAfterCall();
LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] received request Name# %s ok# %s data# %s peer# %s current inflight# %li", this,
- Name, ok ? "true" : "false", NYdbGrpc::FormatMessage<TIn>(Request, ok).data(), GetPeerName().c_str(), Server->GetCurrentInFlight());
+ Name, ok ? "true" : "false", NYdbGrpc::FormatMessage<TIn>(Request, ok).data(), GetPeer().c_str(), Server->GetCurrentInFlight());
if (Context.c_call() == nullptr) {
Y_ABORT_UNLESS(!ok);
@@ -318,7 +318,7 @@ private:
bool FinishDone(bool ok) {
OnAfterCall();
LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] finished request Name# %s ok# %s peer# %s", this,
- Name, ok ? "true" : "false", GetPeerName().c_str());
+ Name, ok ? "true" : "false", GetPeer().c_str());
Counters->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus,
TDuration::Seconds(RequestTimer.Passed()));
Server->DecRequest();
@@ -330,7 +330,7 @@ private:
bool FinishDoneWithoutProcessing(bool ok) {
OnAfterCall();
LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] finished request without processing Name# %s ok# %s peer# %s", this,
- Name, ok ? "true" : "false", GetPeerName().c_str());
+ Name, ok ? "true" : "false", GetPeer().c_str());
return false;
}
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index 7885c14f066..65bbceccbb5 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -439,7 +439,6 @@ class IRequestCtx
friend class TProtoResponseHelper;
public:
using EStreamCtrl = NYdbGrpc::IRequestContextBase::EStreamCtrl;
- virtual google::protobuf::Message* GetRequestMut() = 0;
virtual void SetRuHeader(ui64 ru) = 0;
virtual void AddServerHint(const TString& hint) = 0;
@@ -1166,13 +1165,6 @@ public:
return request;
}
- template <typename T>
- static TRequest* GetProtoRequestMut(const T& req) {
- auto request = dynamic_cast<TRequest*>(req->GetRequestMut());
- Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper");
- return request;
- }
-
const TRequest* GetProtoRequest() const {
return GetProtoRequest(this);
}
@@ -1272,10 +1264,6 @@ public:
return Ctx_->GetRequest();
}
- google::protobuf::Message* GetRequestMut() override {
- return Ctx_->GetRequestMut();
- }
-
void SetRespHook(TRespHook&& hook) override {
RespHook = std::move(hook);
}
diff --git a/ydb/core/grpc_services/local_grpc/local_grpc.h b/ydb/core/grpc_services/local_grpc/local_grpc.h
index 8c00724d81c..6da8e538097 100644
--- a/ydb/core/grpc_services/local_grpc/local_grpc.h
+++ b/ydb/core/grpc_services/local_grpc/local_grpc.h
@@ -130,11 +130,6 @@ public:
return &Request_;
}
- //! Get mutable pointer to the request's message.
- NProtoBuf::Message* GetRequestMut() override {
- return &Request_;
- }
-
void Reply(NProtoBuf::Message* proto, ui32 status = 0) override {
Y_UNUSED(status);
TResp* resp = dynamic_cast<TResp*>(proto);
diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h
index eb4b4a8deee..63b4cbe0687 100644
--- a/ydb/core/grpc_services/local_rpc/local_rpc.h
+++ b/ydb/core/grpc_services/local_rpc/local_rpc.h
@@ -196,10 +196,6 @@ public:
return &Request;
}
- google::protobuf::Message* GetRequestMut() override {
- return &Request;
- }
-
void SetFinishAction(std::function<void()>&&) override {}
bool IsClientLost() const override { return false; }
@@ -417,10 +413,6 @@ protected:
return GetBaseRequest().GetRequest();
}
- NProtoBuf::Message* GetRequestMut() override {
- return GetBaseRequest().GetRequestMut();
- }
-
TAsyncFinishResult GetFinishFuture() override {
return FinishPromise.GetFuture();
}
diff --git a/ydb/core/grpc_services/rpc_deferrable.h b/ydb/core/grpc_services/rpc_deferrable.h
index 8de3a317cdd..b14564a44ff 100644
--- a/ydb/core/grpc_services/rpc_deferrable.h
+++ b/ydb/core/grpc_services/rpc_deferrable.h
@@ -64,10 +64,6 @@ public:
return TRequest::GetProtoRequest(Request_);
}
- typename TRequest::TRequest* GetProtoRequestMut() {
- return TRequest::GetProtoRequestMut(Request_);
- }
-
Ydb::Operations::OperationParams::OperationMode GetOperationMode() const {
return GetProtoRequest()->operation_params().operation_mode();
}
diff --git a/ydb/core/grpc_services/ydb_over_fq/rpc_base.h b/ydb/core/grpc_services/ydb_over_fq/rpc_base.h
index e8f7a402042..4ecc392b1ba 100644
--- a/ydb/core/grpc_services/ydb_over_fq/rpc_base.h
+++ b/ydb/core/grpc_services/ydb_over_fq/rpc_base.h
@@ -71,10 +71,6 @@ public:
return TReq::GetProtoRequest(Request_);
}
- TRequest* GetProtoRequestMut() noexcept {
- return TReq::GetProtoRequestMut(Request_);
- }
-
IRequestNoOpCtx& Request() noexcept { return *Request_; }
private:
diff --git a/ydb/core/grpc_streaming/grpc_streaming.h b/ydb/core/grpc_streaming/grpc_streaming.h
index c6d6fd57e3a..bfb80c7b949 100644
--- a/ydb/core/grpc_streaming/grpc_streaming.h
+++ b/ydb/core/grpc_streaming/grpc_streaming.h
@@ -224,7 +224,7 @@ private:
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] stream accepted Name# %s ok# %s peer# %s",
this, Name,
status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false",
- this->GetPeerName().c_str());
+ this->GetPeer().c_str());
if (status == NYdbGrpc::EQueueEventStatus::ERROR) {
// Don't bother registering if accept failed
@@ -265,7 +265,7 @@ private:
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] stream done notification Name# %s ok# %s peer# %s",
this, Name,
status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false",
- this->GetPeerName().c_str());
+ this->GetPeer().c_str());
bool success = status == NYdbGrpc::EQueueEventStatus::OK;
@@ -285,7 +285,7 @@ private:
void Cancel() {
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade cancel Name# %s peer# %s",
this, Name,
- this->GetPeerName().c_str());
+ this->GetPeer().c_str());
this->Context.TryCancel();
}
@@ -298,7 +298,7 @@ private:
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade attach Name# %s actor# %s peer# %s",
this, Name,
actor.ToString().c_str(),
- this->GetPeerName().c_str());
+ this->GetPeer().c_str());
auto guard = SingleThreaded.Enforce();
@@ -322,7 +322,7 @@ private:
bool Read() {
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade read Name# %s peer# %s",
this, Name,
- this->GetPeerName().c_str());
+ this->GetPeer().c_str());
auto guard = SingleThreaded.Enforce();
@@ -350,7 +350,7 @@ private:
this, Name,
status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false",
NYdbGrpc::FormatMessage<TIn>(ReadInProgress->Record, status == NYdbGrpc::EQueueEventStatus::OK).c_str(),
- this->GetPeerName().c_str());
+ this->GetPeer().c_str());
// Take current in-progress read first
auto read = std::move(ReadInProgress);
@@ -373,7 +373,7 @@ private:
Y_DEBUG_ABORT_UNLESS(flags & FlagFinishCalled);
if (Flags.compare_exchange_weak(flags, flags & ~FlagRegistered, std::memory_order_acq_rel)) {
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] deregistering request Name# %s peer# %s (read done)",
- this, Name, this->GetPeerName().c_str());
+ this, Name, this->GetPeer().c_str());
Server->DeregisterRequestCtx(this);
break;
}
@@ -391,14 +391,14 @@ private:
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade write Name# %s data# %s peer# %s grpc status# (%d) message# %s",
this, Name,
NYdbGrpc::FormatMessage<TOut>(message).c_str(),
- this->GetPeerName().c_str(),
+ this->GetPeer().c_str(),
static_cast<int>(status->error_code()),
status->error_message().c_str());
} else {
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade write Name# %s data# %s peer# %s",
this, Name,
NYdbGrpc::FormatMessage<TOut>(message).c_str(),
- this->GetPeerName().c_str());
+ this->GetPeer().c_str());
}
Y_ABORT_UNLESS(!options.is_corked(),
@@ -453,7 +453,7 @@ private:
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] write finished Name# %s ok# %s peer# %s",
this, Name,
status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false",
- this->GetPeerName().c_str());
+ this->GetPeer().c_str());
auto event = MakeHolder<typename IContext::TEvWriteFinished>();
event->Success = status == NYdbGrpc::EQueueEventStatus::OK;
@@ -506,7 +506,7 @@ private:
bool Finish(const grpc::Status& status) {
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade finish Name# %s peer# %s grpc status# (%d) message# %s",
this, Name,
- this->GetPeerName().c_str(),
+ this->GetPeer().c_str(),
static_cast<int>(status.error_code()),
status.error_message().c_str());
@@ -542,7 +542,7 @@ private:
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] stream finished Name# %s ok# %s peer# %s grpc status# (%d) message# %s",
this, Name,
status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false",
- this->GetPeerName().c_str(),
+ this->GetPeer().c_str(),
static_cast<int>(Status->error_code()),
Status->error_message().c_str());
@@ -577,7 +577,7 @@ private:
while ((flags & FlagRegistered) && ReadQueue.load() == 0) {
if (Flags.compare_exchange_weak(flags, flags & ~FlagRegistered, std::memory_order_acq_rel)) {
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] deregistering request Name# %s peer# %s (finish done)",
- this, Name, this->GetPeerName().c_str());
+ this, Name, this->GetPeer().c_str());
Server->DeregisterRequestCtx(this);
break;
}
@@ -646,7 +646,7 @@ private:
}
TString GetPeerName() const override {
- return Self->GetPeerName();
+ return Self->GetPeer();
}
TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const override {
diff --git a/ydb/core/kafka_proxy/actors/control_plane_common.h b/ydb/core/kafka_proxy/actors/control_plane_common.h
index 597d808a269..9895ab2b4f1 100644
--- a/ydb/core/kafka_proxy/actors/control_plane_common.h
+++ b/ydb/core/kafka_proxy/actors/control_plane_common.h
@@ -292,10 +292,6 @@ public:
return DummyAuditLogParts;
};
- google::protobuf::Message* GetRequestMut() override {
- return nullptr;
- };
-
void SetRuHeader(ui64 ru) override {
Y_UNUSED(ru);
};
diff --git a/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp
index 521f8598f9b..2c6d5632325 100644
--- a/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp
@@ -127,10 +127,6 @@ public:
return DummyAuditLogParts;
};
- google::protobuf::Message* GetRequestMut() override {
- return nullptr;
- };
-
void SetRuHeader(ui64 ru) override {
Y_UNUSED(ru);
};
diff --git a/ydb/core/public_http/grpc_request_context_wrapper.cpp b/ydb/core/public_http/grpc_request_context_wrapper.cpp
index 19624005a44..42ef9949363 100644
--- a/ydb/core/public_http/grpc_request_context_wrapper.cpp
+++ b/ydb/core/public_http/grpc_request_context_wrapper.cpp
@@ -19,10 +19,6 @@ namespace NKikimr::NPublicHttp {
return Request.get();
}
- NProtoBuf::Message* TGrpcRequestContextWrapper::GetRequestMut() {
- return Request.get();
- }
-
NYdbGrpc::TAuthState& TGrpcRequestContextWrapper::GetAuthState() {
return AuthState;
}
diff --git a/ydb/core/public_http/grpc_request_context_wrapper.h b/ydb/core/public_http/grpc_request_context_wrapper.h
index 35f47e3bdd6..35d05be0bd8 100644
--- a/ydb/core/public_http/grpc_request_context_wrapper.h
+++ b/ydb/core/public_http/grpc_request_context_wrapper.h
@@ -24,7 +24,6 @@ private:
public:
TGrpcRequestContextWrapper(const THttpRequestContext& requestContext, std::unique_ptr<NProtoBuf::Message> request, TReplySender replySender);
virtual const NProtoBuf::Message* GetRequest() const;
- virtual NProtoBuf::Message* GetRequestMut();
virtual NYdbGrpc::TAuthState& GetAuthState();
virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0);
virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0, EStreamCtrl ctrl = EStreamCtrl::CONT);
diff --git a/ydb/library/grpc/server/grpc_async_ctx_base.h b/ydb/library/grpc/server/grpc_async_ctx_base.h
index 5ece9974317..4837cc5b1d4 100644
--- a/ydb/library/grpc/server/grpc_async_ctx_base.h
+++ b/ydb/library/grpc/server/grpc_async_ctx_base.h
@@ -25,7 +25,7 @@ public:
{
}
- TString GetPeerName() const {
+ TString GetPeer() const {
// Decode URL-encoded square brackets
auto ip = Context.peer();
CGIUnescape(ip);
diff --git a/ydb/library/grpc/server/grpc_request.h b/ydb/library/grpc/server/grpc_request.h
index 3be0cb44ee9..67435f50298 100644
--- a/ydb/library/grpc/server/grpc_request.h
+++ b/ydb/library/grpc/server/grpc_request.h
@@ -120,13 +120,6 @@ public:
return bool(StreamAdaptor_);
}
- TString GetPeer() const override {
- // Decode URL-encoded square brackets
- auto ip = TString(this->Context.peer());
- CGIUnescape(ip);
- return ip;
- }
-
bool SslServer() const override {
return Server_->SslServer();
}
@@ -168,6 +161,10 @@ public:
UnRef();
}
+ TString GetPeer() const override {
+ return TBaseAsyncContext<TService>::GetPeer();
+ }
+
TInstant Deadline() const override {
return TBaseAsyncContext<TService>::Deadline();
}
@@ -193,11 +190,6 @@ public:
return Request_;
}
- NProtoBuf::Message* GetRequestMut() override {
- return Request_;
- }
-
-
TAuthState& GetAuthState() override {
return AuthState_;
}
diff --git a/ydb/library/grpc/server/grpc_request_base.h b/ydb/library/grpc/server/grpc_request_base.h
index 2ad89993c85..a43c1c41d61 100644
--- a/ydb/library/grpc/server/grpc_request_base.h
+++ b/ydb/library/grpc/server/grpc_request_base.h
@@ -58,9 +58,6 @@ public:
//! Get pointer to the request's message.
virtual const NProtoBuf::Message* GetRequest() const = 0;
- //! Get mutable pointer to the request's message.
- virtual NProtoBuf::Message* GetRequestMut() = 0;
-
//! Get current auth state
virtual TAuthState& GetAuthState() = 0;