diff options
author | Daniil Cherednik <dcherednik@ydb.tech> | 2024-09-30 13:04:14 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-09-30 14:04:14 +0300 |
commit | fbe6acaa936ce93f4a05735e08b5d22783c5c04b (patch) | |
tree | e3a86d4c656c3c5830229a7e555719fdc4c12d3b | |
parent | 5dc26211e73718ac1359b62bc5af18515c76f88e (diff) | |
download | ydb-fbe6acaa936ce93f4a05735e08b5d22783c5c04b.tar.gz |
[refactoring] Get rid of "GetPeer()" grpc request implementation copy paste and unused functions (#9814)
-rw-r--r-- | ydb/core/client/server/grpc_server.cpp | 12 | ||||
-rw-r--r-- | ydb/core/grpc_services/base/base.h | 12 | ||||
-rw-r--r-- | ydb/core/grpc_services/local_grpc/local_grpc.h | 5 | ||||
-rw-r--r-- | ydb/core/grpc_services/local_rpc/local_rpc.h | 8 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_deferrable.h | 4 | ||||
-rw-r--r-- | ydb/core/grpc_services/ydb_over_fq/rpc_base.h | 4 | ||||
-rw-r--r-- | ydb/core/grpc_streaming/grpc_streaming.h | 28 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/control_plane_common.h | 4 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp | 4 | ||||
-rw-r--r-- | ydb/core/public_http/grpc_request_context_wrapper.cpp | 4 | ||||
-rw-r--r-- | ydb/core/public_http/grpc_request_context_wrapper.h | 1 | ||||
-rw-r--r-- | ydb/library/grpc/server/grpc_async_ctx_base.h | 2 | ||||
-rw-r--r-- | ydb/library/grpc/server/grpc_request.h | 16 | ||||
-rw-r--r-- | ydb/library/grpc/server/grpc_request_base.h | 3 |
14 files changed, 25 insertions, 82 deletions
diff --git a/ydb/core/client/server/grpc_server.cpp b/ydb/core/client/server/grpc_server.cpp index 02326b38d2b..c1e53a4ea2b 100644 --- a/ydb/core/client/server/grpc_server.cpp +++ b/ydb/core/client/server/grpc_server.cpp @@ -239,7 +239,7 @@ public: } TString GetPeer() const override { - return GetPeerName(); + return TGrpcBaseAsyncContext::GetPeer(); } TVector<TStringBuf> FindClientCert() const override { @@ -253,7 +253,7 @@ private: void Finish(const TOut& resp, ui32 status) { LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] issuing response Name# %s data# %s peer# %s", this, - Name, NYdbGrpc::FormatMessage<TOut>(resp).data(), GetPeerName().c_str()); + Name, NYdbGrpc::FormatMessage<TOut>(resp).data(), GetPeer().c_str()); ResponseSize = resp.ByteSize(); ResponseStatus = status; StateFunc = &TSimpleRequest::FinishDone; @@ -266,7 +266,7 @@ private: TOut resp; TString msg = "no resource"; LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] issuing response Name# %s nodata (no resources) peer# %s", this, - Name, GetPeerName().c_str()); + Name, GetPeer().c_str()); StateFunc = &TSimpleRequest::FinishDoneWithoutProcessing; OnBeforeCall(); @@ -280,7 +280,7 @@ private: OnAfterCall(); LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] received request Name# %s ok# %s data# %s peer# %s current inflight# %li", this, - Name, ok ? "true" : "false", NYdbGrpc::FormatMessage<TIn>(Request, ok).data(), GetPeerName().c_str(), Server->GetCurrentInFlight()); + Name, ok ? "true" : "false", NYdbGrpc::FormatMessage<TIn>(Request, ok).data(), GetPeer().c_str(), Server->GetCurrentInFlight()); if (Context.c_call() == nullptr) { Y_ABORT_UNLESS(!ok); @@ -318,7 +318,7 @@ private: bool FinishDone(bool ok) { OnAfterCall(); LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] finished request Name# %s ok# %s peer# %s", this, - Name, ok ? "true" : "false", GetPeerName().c_str()); + Name, ok ? "true" : "false", GetPeer().c_str()); Counters->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus, TDuration::Seconds(RequestTimer.Passed())); Server->DecRequest(); @@ -330,7 +330,7 @@ private: bool FinishDoneWithoutProcessing(bool ok) { OnAfterCall(); LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] finished request without processing Name# %s ok# %s peer# %s", this, - Name, ok ? "true" : "false", GetPeerName().c_str()); + Name, ok ? "true" : "false", GetPeer().c_str()); return false; } diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 7885c14f066..65bbceccbb5 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -439,7 +439,6 @@ class IRequestCtx friend class TProtoResponseHelper; public: using EStreamCtrl = NYdbGrpc::IRequestContextBase::EStreamCtrl; - virtual google::protobuf::Message* GetRequestMut() = 0; virtual void SetRuHeader(ui64 ru) = 0; virtual void AddServerHint(const TString& hint) = 0; @@ -1166,13 +1165,6 @@ public: return request; } - template <typename T> - static TRequest* GetProtoRequestMut(const T& req) { - auto request = dynamic_cast<TRequest*>(req->GetRequestMut()); - Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper"); - return request; - } - const TRequest* GetProtoRequest() const { return GetProtoRequest(this); } @@ -1272,10 +1264,6 @@ public: return Ctx_->GetRequest(); } - google::protobuf::Message* GetRequestMut() override { - return Ctx_->GetRequestMut(); - } - void SetRespHook(TRespHook&& hook) override { RespHook = std::move(hook); } diff --git a/ydb/core/grpc_services/local_grpc/local_grpc.h b/ydb/core/grpc_services/local_grpc/local_grpc.h index 8c00724d81c..6da8e538097 100644 --- a/ydb/core/grpc_services/local_grpc/local_grpc.h +++ b/ydb/core/grpc_services/local_grpc/local_grpc.h @@ -130,11 +130,6 @@ public: return &Request_; } - //! Get mutable pointer to the request's message. - NProtoBuf::Message* GetRequestMut() override { - return &Request_; - } - void Reply(NProtoBuf::Message* proto, ui32 status = 0) override { Y_UNUSED(status); TResp* resp = dynamic_cast<TResp*>(proto); diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h index eb4b4a8deee..63b4cbe0687 100644 --- a/ydb/core/grpc_services/local_rpc/local_rpc.h +++ b/ydb/core/grpc_services/local_rpc/local_rpc.h @@ -196,10 +196,6 @@ public: return &Request; } - google::protobuf::Message* GetRequestMut() override { - return &Request; - } - void SetFinishAction(std::function<void()>&&) override {} bool IsClientLost() const override { return false; } @@ -417,10 +413,6 @@ protected: return GetBaseRequest().GetRequest(); } - NProtoBuf::Message* GetRequestMut() override { - return GetBaseRequest().GetRequestMut(); - } - TAsyncFinishResult GetFinishFuture() override { return FinishPromise.GetFuture(); } diff --git a/ydb/core/grpc_services/rpc_deferrable.h b/ydb/core/grpc_services/rpc_deferrable.h index 8de3a317cdd..b14564a44ff 100644 --- a/ydb/core/grpc_services/rpc_deferrable.h +++ b/ydb/core/grpc_services/rpc_deferrable.h @@ -64,10 +64,6 @@ public: return TRequest::GetProtoRequest(Request_); } - typename TRequest::TRequest* GetProtoRequestMut() { - return TRequest::GetProtoRequestMut(Request_); - } - Ydb::Operations::OperationParams::OperationMode GetOperationMode() const { return GetProtoRequest()->operation_params().operation_mode(); } diff --git a/ydb/core/grpc_services/ydb_over_fq/rpc_base.h b/ydb/core/grpc_services/ydb_over_fq/rpc_base.h index e8f7a402042..4ecc392b1ba 100644 --- a/ydb/core/grpc_services/ydb_over_fq/rpc_base.h +++ b/ydb/core/grpc_services/ydb_over_fq/rpc_base.h @@ -71,10 +71,6 @@ public: return TReq::GetProtoRequest(Request_); } - TRequest* GetProtoRequestMut() noexcept { - return TReq::GetProtoRequestMut(Request_); - } - IRequestNoOpCtx& Request() noexcept { return *Request_; } private: diff --git a/ydb/core/grpc_streaming/grpc_streaming.h b/ydb/core/grpc_streaming/grpc_streaming.h index c6d6fd57e3a..bfb80c7b949 100644 --- a/ydb/core/grpc_streaming/grpc_streaming.h +++ b/ydb/core/grpc_streaming/grpc_streaming.h @@ -224,7 +224,7 @@ private: LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] stream accepted Name# %s ok# %s peer# %s", this, Name, status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false", - this->GetPeerName().c_str()); + this->GetPeer().c_str()); if (status == NYdbGrpc::EQueueEventStatus::ERROR) { // Don't bother registering if accept failed @@ -265,7 +265,7 @@ private: LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] stream done notification Name# %s ok# %s peer# %s", this, Name, status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false", - this->GetPeerName().c_str()); + this->GetPeer().c_str()); bool success = status == NYdbGrpc::EQueueEventStatus::OK; @@ -285,7 +285,7 @@ private: void Cancel() { LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade cancel Name# %s peer# %s", this, Name, - this->GetPeerName().c_str()); + this->GetPeer().c_str()); this->Context.TryCancel(); } @@ -298,7 +298,7 @@ private: LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade attach Name# %s actor# %s peer# %s", this, Name, actor.ToString().c_str(), - this->GetPeerName().c_str()); + this->GetPeer().c_str()); auto guard = SingleThreaded.Enforce(); @@ -322,7 +322,7 @@ private: bool Read() { LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade read Name# %s peer# %s", this, Name, - this->GetPeerName().c_str()); + this->GetPeer().c_str()); auto guard = SingleThreaded.Enforce(); @@ -350,7 +350,7 @@ private: this, Name, status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false", NYdbGrpc::FormatMessage<TIn>(ReadInProgress->Record, status == NYdbGrpc::EQueueEventStatus::OK).c_str(), - this->GetPeerName().c_str()); + this->GetPeer().c_str()); // Take current in-progress read first auto read = std::move(ReadInProgress); @@ -373,7 +373,7 @@ private: Y_DEBUG_ABORT_UNLESS(flags & FlagFinishCalled); if (Flags.compare_exchange_weak(flags, flags & ~FlagRegistered, std::memory_order_acq_rel)) { LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] deregistering request Name# %s peer# %s (read done)", - this, Name, this->GetPeerName().c_str()); + this, Name, this->GetPeer().c_str()); Server->DeregisterRequestCtx(this); break; } @@ -391,14 +391,14 @@ private: LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade write Name# %s data# %s peer# %s grpc status# (%d) message# %s", this, Name, NYdbGrpc::FormatMessage<TOut>(message).c_str(), - this->GetPeerName().c_str(), + this->GetPeer().c_str(), static_cast<int>(status->error_code()), status->error_message().c_str()); } else { LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade write Name# %s data# %s peer# %s", this, Name, NYdbGrpc::FormatMessage<TOut>(message).c_str(), - this->GetPeerName().c_str()); + this->GetPeer().c_str()); } Y_ABORT_UNLESS(!options.is_corked(), @@ -453,7 +453,7 @@ private: LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] write finished Name# %s ok# %s peer# %s", this, Name, status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false", - this->GetPeerName().c_str()); + this->GetPeer().c_str()); auto event = MakeHolder<typename IContext::TEvWriteFinished>(); event->Success = status == NYdbGrpc::EQueueEventStatus::OK; @@ -506,7 +506,7 @@ private: bool Finish(const grpc::Status& status) { LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade finish Name# %s peer# %s grpc status# (%d) message# %s", this, Name, - this->GetPeerName().c_str(), + this->GetPeer().c_str(), static_cast<int>(status.error_code()), status.error_message().c_str()); @@ -542,7 +542,7 @@ private: LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] stream finished Name# %s ok# %s peer# %s grpc status# (%d) message# %s", this, Name, status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false", - this->GetPeerName().c_str(), + this->GetPeer().c_str(), static_cast<int>(Status->error_code()), Status->error_message().c_str()); @@ -577,7 +577,7 @@ private: while ((flags & FlagRegistered) && ReadQueue.load() == 0) { if (Flags.compare_exchange_weak(flags, flags & ~FlagRegistered, std::memory_order_acq_rel)) { LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] deregistering request Name# %s peer# %s (finish done)", - this, Name, this->GetPeerName().c_str()); + this, Name, this->GetPeer().c_str()); Server->DeregisterRequestCtx(this); break; } @@ -646,7 +646,7 @@ private: } TString GetPeerName() const override { - return Self->GetPeerName(); + return Self->GetPeer(); } TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const override { diff --git a/ydb/core/kafka_proxy/actors/control_plane_common.h b/ydb/core/kafka_proxy/actors/control_plane_common.h index 597d808a269..9895ab2b4f1 100644 --- a/ydb/core/kafka_proxy/actors/control_plane_common.h +++ b/ydb/core/kafka_proxy/actors/control_plane_common.h @@ -292,10 +292,6 @@ public: return DummyAuditLogParts; }; - google::protobuf::Message* GetRequestMut() override { - return nullptr; - }; - void SetRuHeader(ui64 ru) override { Y_UNUSED(ru); }; diff --git a/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp index 521f8598f9b..2c6d5632325 100644 --- a/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp @@ -127,10 +127,6 @@ public: return DummyAuditLogParts; }; - google::protobuf::Message* GetRequestMut() override { - return nullptr; - }; - void SetRuHeader(ui64 ru) override { Y_UNUSED(ru); }; diff --git a/ydb/core/public_http/grpc_request_context_wrapper.cpp b/ydb/core/public_http/grpc_request_context_wrapper.cpp index 19624005a44..42ef9949363 100644 --- a/ydb/core/public_http/grpc_request_context_wrapper.cpp +++ b/ydb/core/public_http/grpc_request_context_wrapper.cpp @@ -19,10 +19,6 @@ namespace NKikimr::NPublicHttp { return Request.get(); } - NProtoBuf::Message* TGrpcRequestContextWrapper::GetRequestMut() { - return Request.get(); - } - NYdbGrpc::TAuthState& TGrpcRequestContextWrapper::GetAuthState() { return AuthState; } diff --git a/ydb/core/public_http/grpc_request_context_wrapper.h b/ydb/core/public_http/grpc_request_context_wrapper.h index 35f47e3bdd6..35d05be0bd8 100644 --- a/ydb/core/public_http/grpc_request_context_wrapper.h +++ b/ydb/core/public_http/grpc_request_context_wrapper.h @@ -24,7 +24,6 @@ private: public: TGrpcRequestContextWrapper(const THttpRequestContext& requestContext, std::unique_ptr<NProtoBuf::Message> request, TReplySender replySender); virtual const NProtoBuf::Message* GetRequest() const; - virtual NProtoBuf::Message* GetRequestMut(); virtual NYdbGrpc::TAuthState& GetAuthState(); virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0); virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0, EStreamCtrl ctrl = EStreamCtrl::CONT); diff --git a/ydb/library/grpc/server/grpc_async_ctx_base.h b/ydb/library/grpc/server/grpc_async_ctx_base.h index 5ece9974317..4837cc5b1d4 100644 --- a/ydb/library/grpc/server/grpc_async_ctx_base.h +++ b/ydb/library/grpc/server/grpc_async_ctx_base.h @@ -25,7 +25,7 @@ public: { } - TString GetPeerName() const { + TString GetPeer() const { // Decode URL-encoded square brackets auto ip = Context.peer(); CGIUnescape(ip); diff --git a/ydb/library/grpc/server/grpc_request.h b/ydb/library/grpc/server/grpc_request.h index 3be0cb44ee9..67435f50298 100644 --- a/ydb/library/grpc/server/grpc_request.h +++ b/ydb/library/grpc/server/grpc_request.h @@ -120,13 +120,6 @@ public: return bool(StreamAdaptor_); } - TString GetPeer() const override { - // Decode URL-encoded square brackets - auto ip = TString(this->Context.peer()); - CGIUnescape(ip); - return ip; - } - bool SslServer() const override { return Server_->SslServer(); } @@ -168,6 +161,10 @@ public: UnRef(); } + TString GetPeer() const override { + return TBaseAsyncContext<TService>::GetPeer(); + } + TInstant Deadline() const override { return TBaseAsyncContext<TService>::Deadline(); } @@ -193,11 +190,6 @@ public: return Request_; } - NProtoBuf::Message* GetRequestMut() override { - return Request_; - } - - TAuthState& GetAuthState() override { return AuthState_; } diff --git a/ydb/library/grpc/server/grpc_request_base.h b/ydb/library/grpc/server/grpc_request_base.h index 2ad89993c85..a43c1c41d61 100644 --- a/ydb/library/grpc/server/grpc_request_base.h +++ b/ydb/library/grpc/server/grpc_request_base.h @@ -58,9 +58,6 @@ public: //! Get pointer to the request's message. virtual const NProtoBuf::Message* GetRequest() const = 0; - //! Get mutable pointer to the request's message. - virtual NProtoBuf::Message* GetRequestMut() = 0; - //! Get current auth state virtual TAuthState& GetAuthState() = 0; |