diff options
author | akozhikhov <akozhikhov@yandex-team.com> | 2024-12-02 00:06:12 +0300 |
---|---|---|
committer | akozhikhov <akozhikhov@yandex-team.com> | 2024-12-02 00:16:28 +0300 |
commit | ae26d2770a2299c4729defed61d92dfbf8bd1bf8 (patch) | |
tree | c4fcf0d7811b1704ac04cced2e74c163b817c60b | |
parent | 279ff59b5bf4c2a75b36c0f9574e11cefef1c996 (diff) | |
download | ydb-ae26d2770a2299c4729defed61d92dfbf8bd1bf8.tar.gz |
YT-23393: Do not forget to consider response for timed-out request in rpc sensors
commit_hash:732e9977c555b805ace8f6961d33dc1dae9d3efe
-rw-r--r-- | yt/yt/core/rpc/bus/channel.cpp | 32 | ||||
-rw-r--r-- | yt/yt/core/rpc/channel_detail.cpp | 56 | ||||
-rw-r--r-- | yt/yt/core/rpc/channel_detail.h | 12 | ||||
-rw-r--r-- | yt/yt/core/rpc/grpc/channel.cpp | 2 | ||||
-rw-r--r-- | yt/yt/core/rpc/http/channel.cpp | 11 | ||||
-rw-r--r-- | yt/yt/core/rpc/message.cpp | 2 | ||||
-rw-r--r-- | yt/yt/core/rpc/server_detail.cpp | 3 | ||||
-rw-r--r-- | yt/yt_proto/yt/core/rpc/proto/rpc.proto | 4 |
8 files changed, 104 insertions, 18 deletions
diff --git a/yt/yt/core/rpc/bus/channel.cpp b/yt/yt/core/rpc/bus/channel.cpp index 9edfe97ecb..322d1347fc 100644 --- a/yt/yt/core/rpc/bus/channel.cpp +++ b/yt/yt/core/rpc/bus/channel.cpp @@ -897,16 +897,40 @@ private: auto guard = Guard(*bucket); if (bucket->Terminated) { - YT_LOG_WARNING("Response received via a terminated channel (RequestId: %v)", - requestId); + YT_LOG_WARNING("Response received via a terminated channel " + "(RequestId: %v, Service: %v, Method: %v, BodySize: %v, AttachmentSize: %v)", + requestId, + header.service(), + header.method(), + GetMessageBodySize(message), + GetTotalMessageAttachmentSize(message)); + return; } auto it = bucket->ActiveRequestMap.find(requestId); if (it == bucket->ActiveRequestMap.end()) { // This may happen when the other party responds to an already timed-out request. - YT_LOG_DEBUG("Response for an incorrect or obsolete request received (RequestId: %v)", - requestId); + YT_LOG_DEBUG("Response for an incorrect or obsolete request received " + "(RequestId: %v, Service: %v, Method: %v, BodySize: %v, AttachmentSize: %v)", + requestId, + header.service(), + header.method(), + GetMessageBodySize(message), + GetTotalMessageAttachmentSize(message)); + + if (header.has_service()) { + const auto* counters = TClientRequestPerformanceProfiler::FindPerformanceCounters( + FromProto<std::string>(header.service()), + FromProto<std::string>(header.method())); + if (counters) { + TClientRequestPerformanceProfiler::ProfileReplyWithoutContext( + message, + counters, + /*recognized*/ false); + } + } + return; } diff --git a/yt/yt/core/rpc/channel_detail.cpp b/yt/yt/core/rpc/channel_detail.cpp index 4ad9d45a4d..c9c4bc4b8d 100644 --- a/yt/yt/core/rpc/channel_detail.cpp +++ b/yt/yt/core/rpc/channel_detail.cpp @@ -162,9 +162,9 @@ TFuture<void> TClientRequestControlThunk::SendStreamingFeedback(const TStreaming //////////////////////////////////////////////////////////////////////////////// -struct TClientRequestPerformanceProfiler::TPerformanceCounters +struct TClientRequestPerformanceProfiler::TMethodPerformanceCounters { - TPerformanceCounters(const NProfiling::TProfiler& profiler) + TMethodPerformanceCounters(const NProfiling::TProfiler& profiler) : AckTimeCounter(profiler.Timer("/request_time/ack")) , ReplyTimeCounter(profiler.Timer("/request_time/reply")) , TimeoutTimeCounter(profiler.Timer("/request_time/timeout")) @@ -176,8 +176,18 @@ struct TClientRequestPerformanceProfiler::TPerformanceCounters , CancelledRequestCounter(profiler.Counter("/cancelled_request_count")) , RequestMessageBodySizeCounter(profiler.Counter("/request_message_body_bytes")) , RequestMessageAttachmentSizeCounter(profiler.Counter("/request_message_attachment_bytes")) - , ResponseMessageBodySizeCounter(profiler.Counter("/response_message_body_bytes")) - , ResponseMessageAttachmentSizeCounter(profiler.Counter("/response_message_attachment_bytes")) + , ResponseMessageBodySizeCounter(profiler + .WithTag("recognized", "true") + .Counter("/response_message_body_bytes")) + , ResponseMessageAttachmentSizeCounter(profiler + .WithTag("recognized", "true") + .Counter("/response_message_attachment_bytes")) + , UnrecognizedResponseMessageBodySizeCounter(profiler + .WithTag("recognized", "false") + .Counter("/response_message_body_bytes")) + , UnrecognizedResponseMessageAttachmentSizeCounter(profiler + .WithTag("recognized", "false") + .Counter("/response_message_attachment_bytes")) { } NProfiling::TEventTimer AckTimeCounter; @@ -194,20 +204,33 @@ struct TClientRequestPerformanceProfiler::TPerformanceCounters NProfiling::TCounter RequestMessageAttachmentSizeCounter; NProfiling::TCounter ResponseMessageBodySizeCounter; NProfiling::TCounter ResponseMessageAttachmentSizeCounter; + NProfiling::TCounter UnrecognizedResponseMessageBodySizeCounter; + NProfiling::TCounter UnrecognizedResponseMessageAttachmentSizeCounter; }; -auto TClientRequestPerformanceProfiler::GetPerformanceCounters( +const TClientRequestPerformanceProfiler::TMethodPerformanceCounters* +TClientRequestPerformanceProfiler::FindPerformanceCounters( std::string service, - std::string method) -> const TPerformanceCounters* + std::string method) { - using TCountersMap = NConcurrency::TSyncMap<std::pair<std::string, std::string>, TPerformanceCounters>; + using TCountersMap = NConcurrency::TSyncMap<std::pair<std::string, std::string>, TMethodPerformanceCounters>; + + return LeakySingleton<TCountersMap>()->Find(std::pair(service, method)); +} + +const TClientRequestPerformanceProfiler::TMethodPerformanceCounters* +TClientRequestPerformanceProfiler::GetPerformanceCounters( + std::string service, + std::string method) +{ + using TCountersMap = NConcurrency::TSyncMap<std::pair<std::string, std::string>, TMethodPerformanceCounters>; auto [counter, _] = LeakySingleton<TCountersMap>()->FindOrInsert(std::pair(service, method), [&] { auto profiler = RpcClientProfiler() .WithHot() .WithTag("yt_service", service) .WithTag("method", method, -1); - return TPerformanceCounters(profiler); + return TMethodPerformanceCounters(profiler); }); return counter; } @@ -226,8 +249,21 @@ void TClientRequestPerformanceProfiler::ProfileRequest(const TSharedRefArray& re void TClientRequestPerformanceProfiler::ProfileReply(const TSharedRefArray& responseMessage) { MethodCounters_->ReplyTimeCounter.Record(Timer_.GetElapsedTime()); - MethodCounters_->ResponseMessageBodySizeCounter.Increment(GetMessageBodySize(responseMessage)); - MethodCounters_->ResponseMessageAttachmentSizeCounter.Increment(GetTotalMessageAttachmentSize(responseMessage)); + ProfileReplyWithoutContext(responseMessage, MethodCounters_, /*recognized*/ true); +} + +void TClientRequestPerformanceProfiler::ProfileReplyWithoutContext( + const TSharedRefArray& responseMessage, + const TMethodPerformanceCounters* counters, + bool recognized) +{ + if (recognized) { + counters->ResponseMessageBodySizeCounter.Increment(GetMessageBodySize(responseMessage)); + counters->ResponseMessageAttachmentSizeCounter.Increment(GetTotalMessageAttachmentSize(responseMessage)); + } else { + counters->UnrecognizedResponseMessageBodySizeCounter.Increment(GetMessageBodySize(responseMessage)); + counters->UnrecognizedResponseMessageAttachmentSizeCounter.Increment(GetTotalMessageAttachmentSize(responseMessage)); + } } void TClientRequestPerformanceProfiler::ProfileAcknowledgement() diff --git a/yt/yt/core/rpc/channel_detail.h b/yt/yt/core/rpc/channel_detail.h index 63fc79d879..a4e958c079 100644 --- a/yt/yt/core/rpc/channel_detail.h +++ b/yt/yt/core/rpc/channel_detail.h @@ -93,13 +93,19 @@ public: TDuration ProfileComplete(); + struct TMethodPerformanceCounters; + static const TMethodPerformanceCounters* FindPerformanceCounters(std::string service, std::string method); + static const TMethodPerformanceCounters* GetPerformanceCounters(std::string service, std::string method); + static void ProfileReplyWithoutContext( + const TSharedRefArray& responseMessage, + const TMethodPerformanceCounters* counters, + bool recognized); + private: - struct TPerformanceCounters; + const TMethodPerformanceCounters* const MethodCounters_; - const TPerformanceCounters* const MethodCounters_; NProfiling::TWallTimer Timer_; - static const TPerformanceCounters* GetPerformanceCounters(std::string service, std::string method); }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/rpc/grpc/channel.cpp b/yt/yt/core/rpc/grpc/channel.cpp index b6ea1fd384..f8b04fe147 100644 --- a/yt/yt/core/rpc/grpc/channel.cpp +++ b/yt/yt/core/rpc/grpc/channel.cpp @@ -629,6 +629,8 @@ private: NRpc::NProto::TResponseHeader responseHeader; ToProto(responseHeader.mutable_request_id(), Request_->GetRequestId()); + NYT::ToProto(responseHeader.mutable_service(), Request_->GetService()); + NYT::ToProto(responseHeader.mutable_method(), Request_->GetMethod()); if (Request_->Header().has_response_codec()) { responseHeader.set_codec(Request_->Header().response_codec()); } diff --git a/yt/yt/core/rpc/http/channel.cpp b/yt/yt/core/rpc/http/channel.cpp index c4b5200c16..b656f00c2c 100644 --- a/yt/yt/core/rpc/http/channel.cpp +++ b/yt/yt/core/rpc/http/channel.cpp @@ -193,7 +193,12 @@ private: Response_ = client->Post(url, httpRequestBody, httpRequestHeaders); Response_.Subscribe( - BIND(&TCallHandler::OnResponse, channel->EndpointAddress_, request->GetRequestId(), std::move(responseHandler)) + BIND(&TCallHandler::OnResponse, + channel->EndpointAddress_, + request->GetRequestId(), + request->GetService(), + request->GetMethod(), + std::move(responseHandler)) .Via(NRpc::TDispatcher::Get()->GetHeavyInvoker())); } @@ -219,6 +224,8 @@ private: static void OnResponse( const TString& address, TRequestId requestId, + const std::string& service, + const std::string& method, const IClientResponseHandlerPtr& responseHandler, const TErrorOr<IResponsePtr>& responseOrError) { @@ -250,6 +257,8 @@ private: NRpc::NProto::TResponseHeader responseHeader; ToProto(responseHeader.mutable_request_id(), requestId); + NYT::ToProto(responseHeader.mutable_service(), service); + NYT::ToProto(responseHeader.mutable_method(), method); auto responseMessage = CreateResponseMessage( responseHeader, diff --git a/yt/yt/core/rpc/message.cpp b/yt/yt/core/rpc/message.cpp index a1790399d2..963b152515 100644 --- a/yt/yt/core/rpc/message.cpp +++ b/yt/yt/core/rpc/message.cpp @@ -181,6 +181,8 @@ TSharedRefArray CreateErrorResponseMessage( { NProto::TResponseHeader header; ToProto(header.mutable_request_id(), requestId); + // NB: We do not propagate service and method fields here because they are not necessary + // as this response message is empty and light anyway. if (!error.IsOK()) { ToProto(header.mutable_error(), error); } diff --git a/yt/yt/core/rpc/server_detail.cpp b/yt/yt/core/rpc/server_detail.cpp index ce9802a9dc..047033e005 100644 --- a/yt/yt/core/rpc/server_detail.cpp +++ b/yt/yt/core/rpc/server_detail.cpp @@ -206,6 +206,9 @@ TSharedRefArray TServiceContextBase::BuildResponseMessage() ToProto(header.mutable_request_id(), RequestId_); ToProto(header.mutable_error(), Error_); + ToProto(header.mutable_service(), GetService()); + ToProto(header.mutable_method(), GetMethod()); + if (RequestHeader_->has_response_format()) { header.set_format(RequestHeader_->response_format()); } diff --git a/yt/yt_proto/yt/core/rpc/proto/rpc.proto b/yt/yt_proto/yt/core/rpc/proto/rpc.proto index 6cd11f1d9e..56ee1b154e 100644 --- a/yt/yt_proto/yt/core/rpc/proto/rpc.proto +++ b/yt/yt_proto/yt/core/rpc/proto/rpc.proto @@ -140,6 +140,10 @@ message TResponseHeader { optional NYT.NProto.TGuid request_id = 1; + // May be blank. + optional string service = 7; + optional string method = 8; + // If omitted then OK is assumed. optional NYT.NProto.TError error = 2; |