diff options
| author | bulatman <[email protected]> | 2025-05-12 13:53:29 +0300 |
|---|---|---|
| committer | bulatman <[email protected]> | 2025-05-12 14:12:02 +0300 |
| commit | 876ff1613ce0719ea89f5f5dfddbad6779d70d96 (patch) | |
| tree | 806e2b6d475aa22788db55a7fa6f143569730a5e | |
| parent | 7a941ebd252fd7442b4d1d34d31d72e971ad20bf (diff) | |
YT: Improve rpc error handling
Add option to use a separate logging level for error response
Add option to extend original error in method OnMethodError
commit_hash:01c0404ce7845e5bea13f3411cccc9f9811dd4f0
| -rw-r--r-- | yt/yt/core/rpc/config.cpp | 2 | ||||
| -rw-r--r-- | yt/yt/core/rpc/config.h | 1 | ||||
| -rw-r--r-- | yt/yt/core/rpc/server_detail.cpp | 8 | ||||
| -rw-r--r-- | yt/yt/core/rpc/server_detail.h | 7 | ||||
| -rw-r--r-- | yt/yt/core/rpc/service_detail.cpp | 56 | ||||
| -rw-r--r-- | yt/yt/core/rpc/service_detail.h | 6 |
6 files changed, 60 insertions, 20 deletions
diff --git a/yt/yt/core/rpc/config.cpp b/yt/yt/core/rpc/config.cpp index 2ddb306d6b9..2dbef036913 100644 --- a/yt/yt/core/rpc/config.cpp +++ b/yt/yt/core/rpc/config.cpp @@ -118,6 +118,8 @@ void TMethodConfig::Register(TRegistrar registrar) .Optional(); registrar.Parameter("log_level", &TThis::LogLevel) .Optional(); + registrar.Parameter("error_log_level", &TThis::ErrorLogLevel) + .Optional(); registrar.Parameter("request_bytes_throttler", &TThis::RequestBytesThrottler) .Default(); registrar.Parameter("request_weight_throttler", &TThis::RequestWeightThrottler) diff --git a/yt/yt/core/rpc/config.h b/yt/yt/core/rpc/config.h index 90db866eb81..e6a16bdb82a 100644 --- a/yt/yt/core/rpc/config.h +++ b/yt/yt/core/rpc/config.h @@ -150,6 +150,7 @@ struct TMethodConfig std::optional<int> ConcurrencyLimit; std::optional<i64> ConcurrencyByteLimit; std::optional<NLogging::ELogLevel> LogLevel; + std::optional<NLogging::ELogLevel> ErrorLogLevel; std::optional<TDuration> LoggingSuppressionTimeout; NConcurrency::TThroughputThrottlerConfigPtr RequestBytesThrottler; NConcurrency::TThroughputThrottlerConfigPtr RequestWeightThrottler; diff --git a/yt/yt/core/rpc/server_detail.cpp b/yt/yt/core/rpc/server_detail.cpp index fac02bff971..376abafd44a 100644 --- a/yt/yt/core/rpc/server_detail.cpp +++ b/yt/yt/core/rpc/server_detail.cpp @@ -33,13 +33,15 @@ TServiceContextBase::TServiceContextBase( TMemoryUsageTrackerGuard memoryGuard, IMemoryUsageTrackerPtr memoryUsageTracker, NLogging::TLogger logger, - NLogging::ELogLevel logLevel) + NLogging::ELogLevel logLevel, + std::optional<NLogging::ELogLevel> errorLogLevel) : RequestHeader_(std::move(header)) , RequestMessage_(std::move(requestMessage)) , RequestMemoryGuard_(std::move(memoryGuard)) , MemoryUsageTracker_(std::move(memoryUsageTracker)) , Logger(std::move(logger)) , LogLevel_(logLevel) + , ErrorLogLevel_(errorLogLevel.value_or(logLevel)) { Initialize(); } @@ -49,13 +51,15 @@ TServiceContextBase::TServiceContextBase( TMemoryUsageTrackerGuard memoryGuard, IMemoryUsageTrackerPtr memoryUsageTracker, NLogging::TLogger logger, - NLogging::ELogLevel logLevel) + NLogging::ELogLevel logLevel, + std::optional<NLogging::ELogLevel> errorLogLevel) : RequestHeader_(new TRequestHeader()) , RequestMessage_(std::move(requestMessage)) , RequestMemoryGuard_(std::move(memoryGuard)) , MemoryUsageTracker_(std::move(memoryUsageTracker)) , Logger(std::move(logger)) , LogLevel_(logLevel) + , ErrorLogLevel_(errorLogLevel.value_or(logLevel)) { YT_VERIFY(TryParseRequestHeader(RequestMessage_, RequestHeader_.get())); Initialize(); diff --git a/yt/yt/core/rpc/server_detail.h b/yt/yt/core/rpc/server_detail.h index 0f43ff8c2e8..326d82c9ddf 100644 --- a/yt/yt/core/rpc/server_detail.h +++ b/yt/yt/core/rpc/server_detail.h @@ -125,6 +125,7 @@ protected: const NLogging::TLogger Logger; const NLogging::ELogLevel LogLevel_; + const NLogging::ELogLevel ErrorLogLevel_; // Set in #Initialize. bool LoggingEnabled_; @@ -163,13 +164,15 @@ protected: TMemoryUsageTrackerGuard memoryGuard, IMemoryUsageTrackerPtr memoryUsageTracker, NLogging::TLogger logger, - NLogging::ELogLevel logLevel); + NLogging::ELogLevel logLevel, + std::optional<NLogging::ELogLevel> errorLogLevel = {}); TServiceContextBase( TSharedRefArray requestMessage, TMemoryUsageTrackerGuard memoryGuard, IMemoryUsageTrackerPtr memoryUsageTracker, NLogging::TLogger logger, - NLogging::ELogLevel logLevel); + NLogging::ELogLevel logLevel, + std::optional<NLogging::ELogLevel> errorLogLevel = {}); virtual void DoReply() = 0; virtual void DoFlush(); diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp index c374da2d7ad..1b0379c651d 100644 --- a/yt/yt/core/rpc/service_detail.cpp +++ b/yt/yt/core/rpc/service_detail.cpp @@ -199,6 +199,13 @@ auto TServiceBase::TMethodDescriptor::SetLogLevel(NLogging::ELogLevel value) con return result; } +auto TServiceBase::TMethodDescriptor::SetErrorLogLevel(NLogging::ELogLevel value) const -> TMethodDescriptor +{ + auto result = *this; + result.ErrorLogLevel = value; + return result; +} + auto TServiceBase::TMethodDescriptor::SetLoggingSuppressionTimeout(TDuration value) const -> TMethodDescriptor { auto result = *this; @@ -346,7 +353,8 @@ public: std::move(incomingRequest.MemoryGuard), std::move(incomingRequest.MemoryUsageTracker), std::move(logger), - incomingRequest.RuntimeInfo->LogLevel.load(std::memory_order::relaxed)) + incomingRequest.RuntimeInfo->LogLevel.load(std::memory_order::relaxed), + incomingRequest.RuntimeInfo->ErrorLogLevel.load(std::memory_order::relaxed)) , Service_(std::move(service)) , ReplyBus_(std::move(incomingRequest.ReplyBus)) , RuntimeInfo_(incomingRequest.RuntimeInfo) @@ -879,13 +887,10 @@ private: try { TCurrentTraceContextGuard guard(TraceContext_); DoGuardedRun(handler); + } catch (const TErrorException& ex) { + HandleError(ex.Error()); } catch (const std::exception& ex) { - const auto& descriptor = RuntimeInfo_->Descriptor; - if (descriptor.HandleMethodError) { - Service_->OnMethodError(ex, descriptor.Method); - } - - Reply(ex); + HandleError(ex); } } @@ -958,6 +963,15 @@ private: } } + void HandleError(TError error) + { + const auto& descriptor = RuntimeInfo_->Descriptor; + if (descriptor.HandleMethodError) { + Service_->OnMethodError(&error, descriptor.Method); + } + Reply(error); + } + std::optional<TDuration> GetTraceContextTime() const override { if (TraceContext_) { @@ -1161,10 +1175,10 @@ private: if (TraceContext_ && TraceContext_->IsRecorded()) { TraceContext_->AddTag(ResponseInfoAnnotation, logMessage); } - YT_LOG_EVENT_WITH_DYNAMIC_ANCHOR(Logger, LogLevel_, RuntimeInfo_->ResponseLoggingAnchor, logMessage); + auto logLevel = Error_.IsOK() ? LogLevel_ : ErrorLogLevel_; + YT_LOG_EVENT_WITH_DYNAMIC_ANCHOR(Logger, logLevel, RuntimeInfo_->ResponseLoggingAnchor, logMessage); } - void CreateRequestAttachmentsStream() { auto guard = Guard(StreamsLock_); @@ -1815,18 +1829,26 @@ void TServiceBase::ReplyError(TError error, TIncomingRequest&& incomingRequest) << TErrorAttribute("method", incomingRequest.Method) << TErrorAttribute("endpoint", incomingRequest.ReplyBus->GetEndpointDescription()); + NLogging::ELogLevel logLevel = NLogging::ELogLevel::Debug; + if (incomingRequest.RuntimeInfo) { + logLevel = incomingRequest.RuntimeInfo->ErrorLogLevel; + const auto& descriptor = incomingRequest.RuntimeInfo->Descriptor; + if (descriptor.HandleMethodError) { + OnMethodError(&richError, descriptor.Method); + } + } auto code = richError.GetCode(); - auto logLevel = - code == NRpc::EErrorCode::NoSuchMethod || code == NRpc::EErrorCode::ProtocolError - ? NLogging::ELogLevel::Warning - : NLogging::ELogLevel::Debug; + if (code == NRpc::EErrorCode::NoSuchMethod || code == NRpc::EErrorCode::ProtocolError) { + logLevel = NLogging::ELogLevel::Warning; + } + YT_LOG_EVENT(Logger, logLevel, richError); auto errorMessage = CreateErrorResponseMessage(incomingRequest.RequestId, richError); YT_UNUSED_FUTURE(incomingRequest.ReplyBus->Send(errorMessage)); } -void TServiceBase::OnMethodError(const TError& /*error*/, const TString& /*method*/) +void TServiceBase::OnMethodError(TError* /*error*/, const TString& /*method*/) { } void TServiceBase::OnRequestAuthenticated( @@ -2575,6 +2597,7 @@ TServiceBase::TRuntimeMethodInfoPtr TServiceBase::RegisterMethod(const TMethodDe runtimeInfo->ConcurrencyLimit.Reconfigure(descriptor.ConcurrencyLimit); runtimeInfo->ConcurrencyByteLimit.Reconfigure(descriptor.ConcurrencyByteLimit); runtimeInfo->LogLevel.store(descriptor.LogLevel); + runtimeInfo->ErrorLogLevel.store(descriptor.ErrorLogLevel.value_or(descriptor.LogLevel)); runtimeInfo->LoggingSuppressionTimeout.store(descriptor.LoggingSuppressionTimeout); // Failure here means that such method is already registered. @@ -2663,9 +2686,12 @@ void TServiceBase::DoConfigure( runtimeInfo->QueueByteSizeLimit.store(methodConfig->QueueByteSizeLimit.value_or(descriptor.QueueByteSizeLimit)); runtimeInfo->ConcurrencyLimit.Reconfigure(methodConfig->ConcurrencyLimit.value_or(descriptor.ConcurrencyLimit)); runtimeInfo->ConcurrencyByteLimit.Reconfigure(methodConfig->ConcurrencyByteLimit.value_or(descriptor.ConcurrencyByteLimit)); - runtimeInfo->LogLevel.store(methodConfig->LogLevel.value_or(descriptor.LogLevel)); runtimeInfo->LoggingSuppressionTimeout.store(methodConfig->LoggingSuppressionTimeout.value_or(descriptor.LoggingSuppressionTimeout)); runtimeInfo->Pooled.store(methodConfig->Pooled.value_or(config->Pooled.value_or(descriptor.Pooled))); + auto logLevel = methodConfig->LogLevel.value_or(descriptor.LogLevel); + auto errorLogLevel = methodConfig->ErrorLogLevel.value_or(descriptor.ErrorLogLevel.value_or(logLevel)); + runtimeInfo->LogLevel.store(logLevel); + runtimeInfo->ErrorLogLevel.store(errorLogLevel); { auto guard = Guard(runtimeInfo->RequestQueuesLock); diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h index 4df20c158f7..3a7862bd08b 100644 --- a/yt/yt/core/rpc/service_detail.h +++ b/yt/yt/core/rpc/service_detail.h @@ -619,6 +619,8 @@ protected: //! Log level for events emitted via |Set(Request|Response)Info|-like functions. NLogging::ELogLevel LogLevel = NLogging::ELogLevel::Debug; + //! Log level for events emitted when method fails, by default |LogLevel| is used. + std::optional<NLogging::ELogLevel> ErrorLogLevel; //! Logging suppression timeout for this method requests. TDuration LoggingSuppressionTimeout = TDuration::Zero(); @@ -654,6 +656,7 @@ protected: TMethodDescriptor SetConcurrencyByteLimit(i64 value) const; TMethodDescriptor SetSystem(bool value) const; TMethodDescriptor SetLogLevel(NLogging::ELogLevel value) const; + TMethodDescriptor SetErrorLogLevel(NLogging::ELogLevel value) const; TMethodDescriptor SetLoggingSuppressionTimeout(TDuration value) const; TMethodDescriptor SetCancelable(bool value) const; TMethodDescriptor SetGenerateAttachmentChecksums(bool value) const; @@ -764,6 +767,7 @@ protected: NProfiling::TCounter UnauthenticatedRequestCounter; std::atomic<NLogging::ELogLevel> LogLevel = {}; + std::atomic<NLogging::ELogLevel> ErrorLogLevel = {}; std::atomic<TDuration> LoggingSuppressionTimeout = {}; using TNonowningPerformanceCountersKey = std::tuple<TStringBuf, TRequestQueue*>; @@ -907,7 +911,7 @@ protected: protected: virtual void OnMethodError( - const TError& error, + TError* error, const TString& method); private: |
