summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbulatman <[email protected]>2025-05-12 13:53:29 +0300
committerbulatman <[email protected]>2025-05-12 14:12:02 +0300
commit876ff1613ce0719ea89f5f5dfddbad6779d70d96 (patch)
tree806e2b6d475aa22788db55a7fa6f143569730a5e
parent7a941ebd252fd7442b4d1d34d31d72e971ad20bf (diff)
YT: Improve rpc error handling
Add option to use a separate logging level for error response Add option to extend original error in method OnMethodError commit_hash:01c0404ce7845e5bea13f3411cccc9f9811dd4f0
-rw-r--r--yt/yt/core/rpc/config.cpp2
-rw-r--r--yt/yt/core/rpc/config.h1
-rw-r--r--yt/yt/core/rpc/server_detail.cpp8
-rw-r--r--yt/yt/core/rpc/server_detail.h7
-rw-r--r--yt/yt/core/rpc/service_detail.cpp56
-rw-r--r--yt/yt/core/rpc/service_detail.h6
6 files changed, 60 insertions, 20 deletions
diff --git a/yt/yt/core/rpc/config.cpp b/yt/yt/core/rpc/config.cpp
index 2ddb306d6b9..2dbef036913 100644
--- a/yt/yt/core/rpc/config.cpp
+++ b/yt/yt/core/rpc/config.cpp
@@ -118,6 +118,8 @@ void TMethodConfig::Register(TRegistrar registrar)
.Optional();
registrar.Parameter("log_level", &TThis::LogLevel)
.Optional();
+ registrar.Parameter("error_log_level", &TThis::ErrorLogLevel)
+ .Optional();
registrar.Parameter("request_bytes_throttler", &TThis::RequestBytesThrottler)
.Default();
registrar.Parameter("request_weight_throttler", &TThis::RequestWeightThrottler)
diff --git a/yt/yt/core/rpc/config.h b/yt/yt/core/rpc/config.h
index 90db866eb81..e6a16bdb82a 100644
--- a/yt/yt/core/rpc/config.h
+++ b/yt/yt/core/rpc/config.h
@@ -150,6 +150,7 @@ struct TMethodConfig
std::optional<int> ConcurrencyLimit;
std::optional<i64> ConcurrencyByteLimit;
std::optional<NLogging::ELogLevel> LogLevel;
+ std::optional<NLogging::ELogLevel> ErrorLogLevel;
std::optional<TDuration> LoggingSuppressionTimeout;
NConcurrency::TThroughputThrottlerConfigPtr RequestBytesThrottler;
NConcurrency::TThroughputThrottlerConfigPtr RequestWeightThrottler;
diff --git a/yt/yt/core/rpc/server_detail.cpp b/yt/yt/core/rpc/server_detail.cpp
index fac02bff971..376abafd44a 100644
--- a/yt/yt/core/rpc/server_detail.cpp
+++ b/yt/yt/core/rpc/server_detail.cpp
@@ -33,13 +33,15 @@ TServiceContextBase::TServiceContextBase(
TMemoryUsageTrackerGuard memoryGuard,
IMemoryUsageTrackerPtr memoryUsageTracker,
NLogging::TLogger logger,
- NLogging::ELogLevel logLevel)
+ NLogging::ELogLevel logLevel,
+ std::optional<NLogging::ELogLevel> errorLogLevel)
: RequestHeader_(std::move(header))
, RequestMessage_(std::move(requestMessage))
, RequestMemoryGuard_(std::move(memoryGuard))
, MemoryUsageTracker_(std::move(memoryUsageTracker))
, Logger(std::move(logger))
, LogLevel_(logLevel)
+ , ErrorLogLevel_(errorLogLevel.value_or(logLevel))
{
Initialize();
}
@@ -49,13 +51,15 @@ TServiceContextBase::TServiceContextBase(
TMemoryUsageTrackerGuard memoryGuard,
IMemoryUsageTrackerPtr memoryUsageTracker,
NLogging::TLogger logger,
- NLogging::ELogLevel logLevel)
+ NLogging::ELogLevel logLevel,
+ std::optional<NLogging::ELogLevel> errorLogLevel)
: RequestHeader_(new TRequestHeader())
, RequestMessage_(std::move(requestMessage))
, RequestMemoryGuard_(std::move(memoryGuard))
, MemoryUsageTracker_(std::move(memoryUsageTracker))
, Logger(std::move(logger))
, LogLevel_(logLevel)
+ , ErrorLogLevel_(errorLogLevel.value_or(logLevel))
{
YT_VERIFY(TryParseRequestHeader(RequestMessage_, RequestHeader_.get()));
Initialize();
diff --git a/yt/yt/core/rpc/server_detail.h b/yt/yt/core/rpc/server_detail.h
index 0f43ff8c2e8..326d82c9ddf 100644
--- a/yt/yt/core/rpc/server_detail.h
+++ b/yt/yt/core/rpc/server_detail.h
@@ -125,6 +125,7 @@ protected:
const NLogging::TLogger Logger;
const NLogging::ELogLevel LogLevel_;
+ const NLogging::ELogLevel ErrorLogLevel_;
// Set in #Initialize.
bool LoggingEnabled_;
@@ -163,13 +164,15 @@ protected:
TMemoryUsageTrackerGuard memoryGuard,
IMemoryUsageTrackerPtr memoryUsageTracker,
NLogging::TLogger logger,
- NLogging::ELogLevel logLevel);
+ NLogging::ELogLevel logLevel,
+ std::optional<NLogging::ELogLevel> errorLogLevel = {});
TServiceContextBase(
TSharedRefArray requestMessage,
TMemoryUsageTrackerGuard memoryGuard,
IMemoryUsageTrackerPtr memoryUsageTracker,
NLogging::TLogger logger,
- NLogging::ELogLevel logLevel);
+ NLogging::ELogLevel logLevel,
+ std::optional<NLogging::ELogLevel> errorLogLevel = {});
virtual void DoReply() = 0;
virtual void DoFlush();
diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp
index c374da2d7ad..1b0379c651d 100644
--- a/yt/yt/core/rpc/service_detail.cpp
+++ b/yt/yt/core/rpc/service_detail.cpp
@@ -199,6 +199,13 @@ auto TServiceBase::TMethodDescriptor::SetLogLevel(NLogging::ELogLevel value) con
return result;
}
+auto TServiceBase::TMethodDescriptor::SetErrorLogLevel(NLogging::ELogLevel value) const -> TMethodDescriptor
+{
+ auto result = *this;
+ result.ErrorLogLevel = value;
+ return result;
+}
+
auto TServiceBase::TMethodDescriptor::SetLoggingSuppressionTimeout(TDuration value) const -> TMethodDescriptor
{
auto result = *this;
@@ -346,7 +353,8 @@ public:
std::move(incomingRequest.MemoryGuard),
std::move(incomingRequest.MemoryUsageTracker),
std::move(logger),
- incomingRequest.RuntimeInfo->LogLevel.load(std::memory_order::relaxed))
+ incomingRequest.RuntimeInfo->LogLevel.load(std::memory_order::relaxed),
+ incomingRequest.RuntimeInfo->ErrorLogLevel.load(std::memory_order::relaxed))
, Service_(std::move(service))
, ReplyBus_(std::move(incomingRequest.ReplyBus))
, RuntimeInfo_(incomingRequest.RuntimeInfo)
@@ -879,13 +887,10 @@ private:
try {
TCurrentTraceContextGuard guard(TraceContext_);
DoGuardedRun(handler);
+ } catch (const TErrorException& ex) {
+ HandleError(ex.Error());
} catch (const std::exception& ex) {
- const auto& descriptor = RuntimeInfo_->Descriptor;
- if (descriptor.HandleMethodError) {
- Service_->OnMethodError(ex, descriptor.Method);
- }
-
- Reply(ex);
+ HandleError(ex);
}
}
@@ -958,6 +963,15 @@ private:
}
}
+ void HandleError(TError error)
+ {
+ const auto& descriptor = RuntimeInfo_->Descriptor;
+ if (descriptor.HandleMethodError) {
+ Service_->OnMethodError(&error, descriptor.Method);
+ }
+ Reply(error);
+ }
+
std::optional<TDuration> GetTraceContextTime() const override
{
if (TraceContext_) {
@@ -1161,10 +1175,10 @@ private:
if (TraceContext_ && TraceContext_->IsRecorded()) {
TraceContext_->AddTag(ResponseInfoAnnotation, logMessage);
}
- YT_LOG_EVENT_WITH_DYNAMIC_ANCHOR(Logger, LogLevel_, RuntimeInfo_->ResponseLoggingAnchor, logMessage);
+ auto logLevel = Error_.IsOK() ? LogLevel_ : ErrorLogLevel_;
+ YT_LOG_EVENT_WITH_DYNAMIC_ANCHOR(Logger, logLevel, RuntimeInfo_->ResponseLoggingAnchor, logMessage);
}
-
void CreateRequestAttachmentsStream()
{
auto guard = Guard(StreamsLock_);
@@ -1815,18 +1829,26 @@ void TServiceBase::ReplyError(TError error, TIncomingRequest&& incomingRequest)
<< TErrorAttribute("method", incomingRequest.Method)
<< TErrorAttribute("endpoint", incomingRequest.ReplyBus->GetEndpointDescription());
+ NLogging::ELogLevel logLevel = NLogging::ELogLevel::Debug;
+ if (incomingRequest.RuntimeInfo) {
+ logLevel = incomingRequest.RuntimeInfo->ErrorLogLevel;
+ const auto& descriptor = incomingRequest.RuntimeInfo->Descriptor;
+ if (descriptor.HandleMethodError) {
+ OnMethodError(&richError, descriptor.Method);
+ }
+ }
auto code = richError.GetCode();
- auto logLevel =
- code == NRpc::EErrorCode::NoSuchMethod || code == NRpc::EErrorCode::ProtocolError
- ? NLogging::ELogLevel::Warning
- : NLogging::ELogLevel::Debug;
+ if (code == NRpc::EErrorCode::NoSuchMethod || code == NRpc::EErrorCode::ProtocolError) {
+ logLevel = NLogging::ELogLevel::Warning;
+ }
+
YT_LOG_EVENT(Logger, logLevel, richError);
auto errorMessage = CreateErrorResponseMessage(incomingRequest.RequestId, richError);
YT_UNUSED_FUTURE(incomingRequest.ReplyBus->Send(errorMessage));
}
-void TServiceBase::OnMethodError(const TError& /*error*/, const TString& /*method*/)
+void TServiceBase::OnMethodError(TError* /*error*/, const TString& /*method*/)
{ }
void TServiceBase::OnRequestAuthenticated(
@@ -2575,6 +2597,7 @@ TServiceBase::TRuntimeMethodInfoPtr TServiceBase::RegisterMethod(const TMethodDe
runtimeInfo->ConcurrencyLimit.Reconfigure(descriptor.ConcurrencyLimit);
runtimeInfo->ConcurrencyByteLimit.Reconfigure(descriptor.ConcurrencyByteLimit);
runtimeInfo->LogLevel.store(descriptor.LogLevel);
+ runtimeInfo->ErrorLogLevel.store(descriptor.ErrorLogLevel.value_or(descriptor.LogLevel));
runtimeInfo->LoggingSuppressionTimeout.store(descriptor.LoggingSuppressionTimeout);
// Failure here means that such method is already registered.
@@ -2663,9 +2686,12 @@ void TServiceBase::DoConfigure(
runtimeInfo->QueueByteSizeLimit.store(methodConfig->QueueByteSizeLimit.value_or(descriptor.QueueByteSizeLimit));
runtimeInfo->ConcurrencyLimit.Reconfigure(methodConfig->ConcurrencyLimit.value_or(descriptor.ConcurrencyLimit));
runtimeInfo->ConcurrencyByteLimit.Reconfigure(methodConfig->ConcurrencyByteLimit.value_or(descriptor.ConcurrencyByteLimit));
- runtimeInfo->LogLevel.store(methodConfig->LogLevel.value_or(descriptor.LogLevel));
runtimeInfo->LoggingSuppressionTimeout.store(methodConfig->LoggingSuppressionTimeout.value_or(descriptor.LoggingSuppressionTimeout));
runtimeInfo->Pooled.store(methodConfig->Pooled.value_or(config->Pooled.value_or(descriptor.Pooled)));
+ auto logLevel = methodConfig->LogLevel.value_or(descriptor.LogLevel);
+ auto errorLogLevel = methodConfig->ErrorLogLevel.value_or(descriptor.ErrorLogLevel.value_or(logLevel));
+ runtimeInfo->LogLevel.store(logLevel);
+ runtimeInfo->ErrorLogLevel.store(errorLogLevel);
{
auto guard = Guard(runtimeInfo->RequestQueuesLock);
diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h
index 4df20c158f7..3a7862bd08b 100644
--- a/yt/yt/core/rpc/service_detail.h
+++ b/yt/yt/core/rpc/service_detail.h
@@ -619,6 +619,8 @@ protected:
//! Log level for events emitted via |Set(Request|Response)Info|-like functions.
NLogging::ELogLevel LogLevel = NLogging::ELogLevel::Debug;
+ //! Log level for events emitted when method fails, by default |LogLevel| is used.
+ std::optional<NLogging::ELogLevel> ErrorLogLevel;
//! Logging suppression timeout for this method requests.
TDuration LoggingSuppressionTimeout = TDuration::Zero();
@@ -654,6 +656,7 @@ protected:
TMethodDescriptor SetConcurrencyByteLimit(i64 value) const;
TMethodDescriptor SetSystem(bool value) const;
TMethodDescriptor SetLogLevel(NLogging::ELogLevel value) const;
+ TMethodDescriptor SetErrorLogLevel(NLogging::ELogLevel value) const;
TMethodDescriptor SetLoggingSuppressionTimeout(TDuration value) const;
TMethodDescriptor SetCancelable(bool value) const;
TMethodDescriptor SetGenerateAttachmentChecksums(bool value) const;
@@ -764,6 +767,7 @@ protected:
NProfiling::TCounter UnauthenticatedRequestCounter;
std::atomic<NLogging::ELogLevel> LogLevel = {};
+ std::atomic<NLogging::ELogLevel> ErrorLogLevel = {};
std::atomic<TDuration> LoggingSuppressionTimeout = {};
using TNonowningPerformanceCountersKey = std::tuple<TStringBuf, TRequestQueue*>;
@@ -907,7 +911,7 @@ protected:
protected:
virtual void OnMethodError(
- const TError& error,
+ TError* error,
const TString& method);
private: