aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorakozhikhov <akozhikhov@yandex-team.com>2024-12-02 00:06:12 +0300
committerakozhikhov <akozhikhov@yandex-team.com>2024-12-02 00:16:28 +0300
commitae26d2770a2299c4729defed61d92dfbf8bd1bf8 (patch)
treec4fcf0d7811b1704ac04cced2e74c163b817c60b
parent279ff59b5bf4c2a75b36c0f9574e11cefef1c996 (diff)
downloadydb-ae26d2770a2299c4729defed61d92dfbf8bd1bf8.tar.gz
YT-23393: Do not forget to consider response for timed-out request in rpc sensors
commit_hash:732e9977c555b805ace8f6961d33dc1dae9d3efe
-rw-r--r--yt/yt/core/rpc/bus/channel.cpp32
-rw-r--r--yt/yt/core/rpc/channel_detail.cpp56
-rw-r--r--yt/yt/core/rpc/channel_detail.h12
-rw-r--r--yt/yt/core/rpc/grpc/channel.cpp2
-rw-r--r--yt/yt/core/rpc/http/channel.cpp11
-rw-r--r--yt/yt/core/rpc/message.cpp2
-rw-r--r--yt/yt/core/rpc/server_detail.cpp3
-rw-r--r--yt/yt_proto/yt/core/rpc/proto/rpc.proto4
8 files changed, 104 insertions, 18 deletions
diff --git a/yt/yt/core/rpc/bus/channel.cpp b/yt/yt/core/rpc/bus/channel.cpp
index 9edfe97ecb..322d1347fc 100644
--- a/yt/yt/core/rpc/bus/channel.cpp
+++ b/yt/yt/core/rpc/bus/channel.cpp
@@ -897,16 +897,40 @@ private:
auto guard = Guard(*bucket);
if (bucket->Terminated) {
- YT_LOG_WARNING("Response received via a terminated channel (RequestId: %v)",
- requestId);
+ YT_LOG_WARNING("Response received via a terminated channel "
+ "(RequestId: %v, Service: %v, Method: %v, BodySize: %v, AttachmentSize: %v)",
+ requestId,
+ header.service(),
+ header.method(),
+ GetMessageBodySize(message),
+ GetTotalMessageAttachmentSize(message));
+
return;
}
auto it = bucket->ActiveRequestMap.find(requestId);
if (it == bucket->ActiveRequestMap.end()) {
// This may happen when the other party responds to an already timed-out request.
- YT_LOG_DEBUG("Response for an incorrect or obsolete request received (RequestId: %v)",
- requestId);
+ YT_LOG_DEBUG("Response for an incorrect or obsolete request received "
+ "(RequestId: %v, Service: %v, Method: %v, BodySize: %v, AttachmentSize: %v)",
+ requestId,
+ header.service(),
+ header.method(),
+ GetMessageBodySize(message),
+ GetTotalMessageAttachmentSize(message));
+
+ if (header.has_service()) {
+ const auto* counters = TClientRequestPerformanceProfiler::FindPerformanceCounters(
+ FromProto<std::string>(header.service()),
+ FromProto<std::string>(header.method()));
+ if (counters) {
+ TClientRequestPerformanceProfiler::ProfileReplyWithoutContext(
+ message,
+ counters,
+ /*recognized*/ false);
+ }
+ }
+
return;
}
diff --git a/yt/yt/core/rpc/channel_detail.cpp b/yt/yt/core/rpc/channel_detail.cpp
index 4ad9d45a4d..c9c4bc4b8d 100644
--- a/yt/yt/core/rpc/channel_detail.cpp
+++ b/yt/yt/core/rpc/channel_detail.cpp
@@ -162,9 +162,9 @@ TFuture<void> TClientRequestControlThunk::SendStreamingFeedback(const TStreaming
////////////////////////////////////////////////////////////////////////////////
-struct TClientRequestPerformanceProfiler::TPerformanceCounters
+struct TClientRequestPerformanceProfiler::TMethodPerformanceCounters
{
- TPerformanceCounters(const NProfiling::TProfiler& profiler)
+ TMethodPerformanceCounters(const NProfiling::TProfiler& profiler)
: AckTimeCounter(profiler.Timer("/request_time/ack"))
, ReplyTimeCounter(profiler.Timer("/request_time/reply"))
, TimeoutTimeCounter(profiler.Timer("/request_time/timeout"))
@@ -176,8 +176,18 @@ struct TClientRequestPerformanceProfiler::TPerformanceCounters
, CancelledRequestCounter(profiler.Counter("/cancelled_request_count"))
, RequestMessageBodySizeCounter(profiler.Counter("/request_message_body_bytes"))
, RequestMessageAttachmentSizeCounter(profiler.Counter("/request_message_attachment_bytes"))
- , ResponseMessageBodySizeCounter(profiler.Counter("/response_message_body_bytes"))
- , ResponseMessageAttachmentSizeCounter(profiler.Counter("/response_message_attachment_bytes"))
+ , ResponseMessageBodySizeCounter(profiler
+ .WithTag("recognized", "true")
+ .Counter("/response_message_body_bytes"))
+ , ResponseMessageAttachmentSizeCounter(profiler
+ .WithTag("recognized", "true")
+ .Counter("/response_message_attachment_bytes"))
+ , UnrecognizedResponseMessageBodySizeCounter(profiler
+ .WithTag("recognized", "false")
+ .Counter("/response_message_body_bytes"))
+ , UnrecognizedResponseMessageAttachmentSizeCounter(profiler
+ .WithTag("recognized", "false")
+ .Counter("/response_message_attachment_bytes"))
{ }
NProfiling::TEventTimer AckTimeCounter;
@@ -194,20 +204,33 @@ struct TClientRequestPerformanceProfiler::TPerformanceCounters
NProfiling::TCounter RequestMessageAttachmentSizeCounter;
NProfiling::TCounter ResponseMessageBodySizeCounter;
NProfiling::TCounter ResponseMessageAttachmentSizeCounter;
+ NProfiling::TCounter UnrecognizedResponseMessageBodySizeCounter;
+ NProfiling::TCounter UnrecognizedResponseMessageAttachmentSizeCounter;
};
-auto TClientRequestPerformanceProfiler::GetPerformanceCounters(
+const TClientRequestPerformanceProfiler::TMethodPerformanceCounters*
+TClientRequestPerformanceProfiler::FindPerformanceCounters(
std::string service,
- std::string method) -> const TPerformanceCounters*
+ std::string method)
{
- using TCountersMap = NConcurrency::TSyncMap<std::pair<std::string, std::string>, TPerformanceCounters>;
+ using TCountersMap = NConcurrency::TSyncMap<std::pair<std::string, std::string>, TMethodPerformanceCounters>;
+
+ return LeakySingleton<TCountersMap>()->Find(std::pair(service, method));
+}
+
+const TClientRequestPerformanceProfiler::TMethodPerformanceCounters*
+TClientRequestPerformanceProfiler::GetPerformanceCounters(
+ std::string service,
+ std::string method)
+{
+ using TCountersMap = NConcurrency::TSyncMap<std::pair<std::string, std::string>, TMethodPerformanceCounters>;
auto [counter, _] = LeakySingleton<TCountersMap>()->FindOrInsert(std::pair(service, method), [&] {
auto profiler = RpcClientProfiler()
.WithHot()
.WithTag("yt_service", service)
.WithTag("method", method, -1);
- return TPerformanceCounters(profiler);
+ return TMethodPerformanceCounters(profiler);
});
return counter;
}
@@ -226,8 +249,21 @@ void TClientRequestPerformanceProfiler::ProfileRequest(const TSharedRefArray& re
void TClientRequestPerformanceProfiler::ProfileReply(const TSharedRefArray& responseMessage)
{
MethodCounters_->ReplyTimeCounter.Record(Timer_.GetElapsedTime());
- MethodCounters_->ResponseMessageBodySizeCounter.Increment(GetMessageBodySize(responseMessage));
- MethodCounters_->ResponseMessageAttachmentSizeCounter.Increment(GetTotalMessageAttachmentSize(responseMessage));
+ ProfileReplyWithoutContext(responseMessage, MethodCounters_, /*recognized*/ true);
+}
+
+void TClientRequestPerformanceProfiler::ProfileReplyWithoutContext(
+ const TSharedRefArray& responseMessage,
+ const TMethodPerformanceCounters* counters,
+ bool recognized)
+{
+ if (recognized) {
+ counters->ResponseMessageBodySizeCounter.Increment(GetMessageBodySize(responseMessage));
+ counters->ResponseMessageAttachmentSizeCounter.Increment(GetTotalMessageAttachmentSize(responseMessage));
+ } else {
+ counters->UnrecognizedResponseMessageBodySizeCounter.Increment(GetMessageBodySize(responseMessage));
+ counters->UnrecognizedResponseMessageAttachmentSizeCounter.Increment(GetTotalMessageAttachmentSize(responseMessage));
+ }
}
void TClientRequestPerformanceProfiler::ProfileAcknowledgement()
diff --git a/yt/yt/core/rpc/channel_detail.h b/yt/yt/core/rpc/channel_detail.h
index 63fc79d879..a4e958c079 100644
--- a/yt/yt/core/rpc/channel_detail.h
+++ b/yt/yt/core/rpc/channel_detail.h
@@ -93,13 +93,19 @@ public:
TDuration ProfileComplete();
+ struct TMethodPerformanceCounters;
+ static const TMethodPerformanceCounters* FindPerformanceCounters(std::string service, std::string method);
+ static const TMethodPerformanceCounters* GetPerformanceCounters(std::string service, std::string method);
+ static void ProfileReplyWithoutContext(
+ const TSharedRefArray& responseMessage,
+ const TMethodPerformanceCounters* counters,
+ bool recognized);
+
private:
- struct TPerformanceCounters;
+ const TMethodPerformanceCounters* const MethodCounters_;
- const TPerformanceCounters* const MethodCounters_;
NProfiling::TWallTimer Timer_;
- static const TPerformanceCounters* GetPerformanceCounters(std::string service, std::string method);
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/rpc/grpc/channel.cpp b/yt/yt/core/rpc/grpc/channel.cpp
index b6ea1fd384..f8b04fe147 100644
--- a/yt/yt/core/rpc/grpc/channel.cpp
+++ b/yt/yt/core/rpc/grpc/channel.cpp
@@ -629,6 +629,8 @@ private:
NRpc::NProto::TResponseHeader responseHeader;
ToProto(responseHeader.mutable_request_id(), Request_->GetRequestId());
+ NYT::ToProto(responseHeader.mutable_service(), Request_->GetService());
+ NYT::ToProto(responseHeader.mutable_method(), Request_->GetMethod());
if (Request_->Header().has_response_codec()) {
responseHeader.set_codec(Request_->Header().response_codec());
}
diff --git a/yt/yt/core/rpc/http/channel.cpp b/yt/yt/core/rpc/http/channel.cpp
index c4b5200c16..b656f00c2c 100644
--- a/yt/yt/core/rpc/http/channel.cpp
+++ b/yt/yt/core/rpc/http/channel.cpp
@@ -193,7 +193,12 @@ private:
Response_ = client->Post(url, httpRequestBody, httpRequestHeaders);
Response_.Subscribe(
- BIND(&TCallHandler::OnResponse, channel->EndpointAddress_, request->GetRequestId(), std::move(responseHandler))
+ BIND(&TCallHandler::OnResponse,
+ channel->EndpointAddress_,
+ request->GetRequestId(),
+ request->GetService(),
+ request->GetMethod(),
+ std::move(responseHandler))
.Via(NRpc::TDispatcher::Get()->GetHeavyInvoker()));
}
@@ -219,6 +224,8 @@ private:
static void OnResponse(
const TString& address,
TRequestId requestId,
+ const std::string& service,
+ const std::string& method,
const IClientResponseHandlerPtr& responseHandler,
const TErrorOr<IResponsePtr>& responseOrError)
{
@@ -250,6 +257,8 @@ private:
NRpc::NProto::TResponseHeader responseHeader;
ToProto(responseHeader.mutable_request_id(), requestId);
+ NYT::ToProto(responseHeader.mutable_service(), service);
+ NYT::ToProto(responseHeader.mutable_method(), method);
auto responseMessage = CreateResponseMessage(
responseHeader,
diff --git a/yt/yt/core/rpc/message.cpp b/yt/yt/core/rpc/message.cpp
index a1790399d2..963b152515 100644
--- a/yt/yt/core/rpc/message.cpp
+++ b/yt/yt/core/rpc/message.cpp
@@ -181,6 +181,8 @@ TSharedRefArray CreateErrorResponseMessage(
{
NProto::TResponseHeader header;
ToProto(header.mutable_request_id(), requestId);
+ // NB: We do not propagate service and method fields here because they are not necessary
+ // as this response message is empty and light anyway.
if (!error.IsOK()) {
ToProto(header.mutable_error(), error);
}
diff --git a/yt/yt/core/rpc/server_detail.cpp b/yt/yt/core/rpc/server_detail.cpp
index ce9802a9dc..047033e005 100644
--- a/yt/yt/core/rpc/server_detail.cpp
+++ b/yt/yt/core/rpc/server_detail.cpp
@@ -206,6 +206,9 @@ TSharedRefArray TServiceContextBase::BuildResponseMessage()
ToProto(header.mutable_request_id(), RequestId_);
ToProto(header.mutable_error(), Error_);
+ ToProto(header.mutable_service(), GetService());
+ ToProto(header.mutable_method(), GetMethod());
+
if (RequestHeader_->has_response_format()) {
header.set_format(RequestHeader_->response_format());
}
diff --git a/yt/yt_proto/yt/core/rpc/proto/rpc.proto b/yt/yt_proto/yt/core/rpc/proto/rpc.proto
index 6cd11f1d9e..56ee1b154e 100644
--- a/yt/yt_proto/yt/core/rpc/proto/rpc.proto
+++ b/yt/yt_proto/yt/core/rpc/proto/rpc.proto
@@ -140,6 +140,10 @@ message TResponseHeader
{
optional NYT.NProto.TGuid request_id = 1;
+ // May be blank.
+ optional string service = 7;
+ optional string method = 8;
+
// If omitted then OK is assumed.
optional NYT.NProto.TError error = 2;