diff options
author | ermolovd <ermolovd@yandex-team.com> | 2024-11-19 10:54:24 +0300 |
---|---|---|
committer | ermolovd <ermolovd@yandex-team.com> | 2024-11-19 11:06:45 +0300 |
commit | 7be946f684e606f5baa9e3a13401cc19b8ac9a3b (patch) | |
tree | a4d6ce2a580ba953a048a64bdedbda1f6c7a1d26 /yt/cpp/mapreduce/http/http.cpp | |
parent | 907667c6ffd6222845fe8e18fd548e508250b5d0 (diff) | |
download | ydb-7be946f684e606f5baa9e3a13401cc19b8ac9a3b.tar.gz |
YT-22943: add context for system errors
* thrown system errors contain context (i.e. host method and request id that produced error)
Type: feature
Component: cpp-sdk
commit_hash:af72a3a37785e9e373e816c2cc072df2076f821d
Diffstat (limited to 'yt/cpp/mapreduce/http/http.cpp')
-rw-r--r-- | yt/cpp/mapreduce/http/http.cpp | 223 |
1 files changed, 150 insertions, 73 deletions
diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp index 12aa33ff29..ca243a929a 100644 --- a/yt/cpp/mapreduce/http/http.cpp +++ b/yt/cpp/mapreduce/http/http.cpp @@ -10,6 +10,7 @@ #include <yt/cpp/mapreduce/interface/config.h> #include <yt/cpp/mapreduce/interface/errors.h> +#include <yt/cpp/mapreduce/interface/error_codes.h> #include <yt/cpp/mapreduce/interface/logging/yt_log.h> #include <yt/yt/core/http/http.h> @@ -39,6 +40,27 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// +std::exception_ptr WrapSystemError( + const TRequestContext& context, + const std::exception& ex) +{ + if (auto errorResponse = dynamic_cast<const TErrorResponse*>(&ex); errorResponse != nullptr) { + return std::make_exception_ptr(errorResponse); + } + + auto message = NYT::Format("Request %qv to %qv failed", context.RequestId, context.HostName + context.Method); + TYtError outer(1, message, {TYtError(NClusterErrorCodes::Generic, ex.what())}, { + {"request_id", context.RequestId}, + {"host", context.HostName}, + {"method", context.Method}, + }); + TTransportError errorResponse(std::move(outer)); + + return std::make_exception_ptr(errorResponse); +} + +//////////////////////////////////////////////////////////////////////////////// + class THttpRequest::TRequestStream : public IOutputStream { @@ -92,17 +114,17 @@ private: CheckErrorState(); try { func(); - } catch (const std::exception&) { - HandleWriteException(); + } catch (const std::exception& ex) { + HandleWriteException(ex); } } // In many cases http proxy stops reading request and resets connection // if error has happend. This function tries to read error response // in such cases. - void HandleWriteException() { + void HandleWriteException(const std::exception& ex) { Y_ABORT_UNLESS(WriteError_ == nullptr); - WriteError_ = std::current_exception(); + WriteError_ = WrapSystemError(HttpRequest_->Context_, ex); Y_ABORT_UNLESS(WriteError_ != nullptr); try { HttpRequest_->GetResponseStream(); @@ -667,33 +689,81 @@ SOCKET TConnectionPool::DoConnect(TAddressCache::TAddressPtr address) //////////////////////////////////////////////////////////////////////////////// -static TMaybe<TString> GetProxyName(const THttpInput& input) +class THttpResponse::THttpInputWrapped + : public IInputStream { - if (auto proxyHeader = input.Headers().FindHeader("X-YT-Proxy")) { - return proxyHeader->Value(); +public: + explicit THttpInputWrapped(TRequestContext context, IInputStream* input) + : Context_(std::move(context)) + , HttpInput_(input) + { } + + const THttpHeaders& Headers() const noexcept + { + return HttpInput_.Headers(); + } + + const TString& FirstLine() const noexcept + { + return HttpInput_.FirstLine(); } - return Nothing(); -} + + bool IsKeepAlive() const noexcept + { + return HttpInput_.IsKeepAlive(); + } + + const TMaybe<THttpHeaders>& Trailers() const noexcept + { + return HttpInput_.Trailers(); + } + +private: + size_t DoRead(void* buf, size_t len) override + { + try { + return HttpInput_.Read(buf, len); + } catch (const std::exception& ex) { + auto wrapped = WrapSystemError(Context_, ex); + std::rethrow_exception(wrapped); + } + } + + size_t DoSkip(size_t len) override + { + try { + return HttpInput_.Skip(len); + } catch (const std::exception& ex) { + auto wrapped = WrapSystemError(Context_, ex); + std::rethrow_exception(wrapped); + } + } + +private: + const TRequestContext Context_; + THttpInput HttpInput_; +}; THttpResponse::THttpResponse( - IInputStream* socketStream, - const TString& requestId, - const TString& hostName) - : RequestId_(requestId) - , HttpInput_(socketStream) - , HostName_(GetProxyName(HttpInput_).GetOrElse(hostName)) - , Unframe_(HttpInput_.Headers().HasHeader("X-YT-Framing")) + TRequestContext context, + IInputStream* socketStream) + : HttpInput_(MakeHolder<THttpInputWrapped>(context, socketStream)) + , Unframe_(HttpInput_->Headers().HasHeader("X-YT-Framing")) + , Context_(std::move(context)) { - HttpCode_ = ParseHttpRetCode(HttpInput_.FirstLine()); + if (auto proxyHeader = HttpInput_->Headers().FindHeader("X-YT-Proxy")) { + Context_.HostName = proxyHeader->Value(); + } + HttpCode_ = ParseHttpRetCode(HttpInput_->FirstLine()); if (HttpCode_ == 200 || HttpCode_ == 202) { return; } - ErrorResponse_ = TErrorResponse(HttpCode_, RequestId_); + ErrorResponse_ = TErrorResponse(HttpCode_, Context_.RequestId); auto logAndSetError = [&] (const TString& rawError) { YT_LOG_ERROR("RSP %v - HTTP %v - %v", - RequestId_, + Context_.RequestId, HttpCode_, rawError.data()); ErrorResponse_->SetRawError(rawError); @@ -705,26 +775,26 @@ THttpResponse::THttpResponse( break; case 500: - logAndSetError(::TStringBuilder() << "internal error in proxy " << HostName_); + logAndSetError(::TStringBuilder() << "internal error in proxy " << Context_.HostName); break; default: { TStringStream httpHeaders; httpHeaders << "HTTP headers ("; - for (const auto& header : HttpInput_.Headers()) { + for (const auto& header : HttpInput_->Headers()) { httpHeaders << header.Name() << ": " << header.Value() << "; "; } httpHeaders << ")"; auto errorString = Sprintf("RSP %s - HTTP %d - %s", - RequestId_.data(), + Context_.RequestId.data(), HttpCode_, httpHeaders.Str().data()); YT_LOG_ERROR("%v", errorString.data()); - if (auto parsedResponse = ParseError(HttpInput_.Headers())) { + if (auto parsedResponse = ParseError(HttpInput_->Headers())) { ErrorResponse_ = parsedResponse.GetRef(); } else { ErrorResponse_->SetRawError( @@ -735,9 +805,12 @@ THttpResponse::THttpResponse( } } +THttpResponse::~THttpResponse() +{ } + const THttpHeaders& THttpResponse::Headers() const { - return HttpInput_.Headers(); + return HttpInput_->Headers(); } void THttpResponse::CheckErrorResponse() const @@ -759,19 +832,19 @@ int THttpResponse::GetHttpCode() const const TString& THttpResponse::GetHostName() const { - return HostName_; + return Context_.HostName; } bool THttpResponse::IsKeepAlive() const { - return HttpInput_.IsKeepAlive(); + return HttpInput_->IsKeepAlive(); } TMaybe<TErrorResponse> THttpResponse::ParseError(const THttpHeaders& headers) { for (const auto& header : headers) { if (header.Name() == "X-YT-Error") { - TErrorResponse errorResponse(HttpCode_, RequestId_); + TErrorResponse errorResponse(HttpCode_, Context_.RequestId); errorResponse.ParseFromJsonError(header.Value()); if (errorResponse.IsOk()) { return Nothing(); @@ -788,14 +861,14 @@ size_t THttpResponse::DoRead(void* buf, size_t len) if (Unframe_) { read = UnframeRead(buf, len); } else { - read = HttpInput_.Read(buf, len); + read = HttpInput_->Read(buf, len); } if (read == 0 && len != 0) { // THttpInput MUST return defined (but may be empty) // trailers when it is exhausted. - Y_ABORT_UNLESS(HttpInput_.Trailers().Defined(), + Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(), "trailers MUST be defined for exhausted stream"); - CheckTrailers(HttpInput_.Trailers().GetRef()); + CheckTrailers(HttpInput_->Trailers().GetRef()); IsExhausted_ = true; } return read; @@ -807,14 +880,14 @@ size_t THttpResponse::DoSkip(size_t len) if (Unframe_) { skipped = UnframeSkip(len); } else { - skipped = HttpInput_.Skip(len); + skipped = HttpInput_->Skip(len); } if (skipped == 0 && len != 0) { // THttpInput MUST return defined (but may be empty) // trailers when it is exhausted. - Y_ABORT_UNLESS(HttpInput_.Trailers().Defined(), + Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(), "trailers MUST be defined for exhausted stream"); - CheckTrailers(HttpInput_.Trailers().GetRef()); + CheckTrailers(HttpInput_->Trailers().GetRef()); IsExhausted_ = true; } return skipped; @@ -825,13 +898,13 @@ void THttpResponse::CheckTrailers(const THttpHeaders& trailers) if (auto errorResponse = ParseError(trailers)) { errorResponse->SetIsFromTrailers(true); YT_LOG_ERROR("RSP %v - %v", - RequestId_, + Context_.RequestId, errorResponse.GetRef().what()); ythrow errorResponse.GetRef(); } } -static ui32 ReadDataFrameSize(THttpInput* stream) +static ui32 ReadDataFrameSize(IInputStream* stream) { ui32 littleEndianSize; auto read = stream->Load(&littleEndianSize, sizeof(littleEndianSize)); @@ -846,7 +919,7 @@ bool THttpResponse::RefreshFrameIfNecessary() { while (RemainingFrameSize_ == 0) { ui8 frameTypeByte; - auto read = HttpInput_.Read(&frameTypeByte, sizeof(frameTypeByte)); + auto read = HttpInput_->Read(&frameTypeByte, sizeof(frameTypeByte)); if (read == 0) { return false; } @@ -855,7 +928,7 @@ bool THttpResponse::RefreshFrameIfNecessary() case EFrameType::KeepAlive: break; case EFrameType::Data: - RemainingFrameSize_ = ReadDataFrameSize(&HttpInput_); + RemainingFrameSize_ = ReadDataFrameSize(HttpInput_.Get()); break; default: ythrow yexception() << "Bad frame type " << static_cast<int>(frameTypeByte); @@ -869,7 +942,7 @@ size_t THttpResponse::UnframeRead(void* buf, size_t len) if (!RefreshFrameIfNecessary()) { return 0; } - auto read = HttpInput_.Read(buf, Min(len, RemainingFrameSize_)); + auto read = HttpInput_->Read(buf, Min(len, RemainingFrameSize_)); RemainingFrameSize_ -= read; return read; } @@ -879,19 +952,22 @@ size_t THttpResponse::UnframeSkip(size_t len) if (!RefreshFrameIfNecessary()) { return 0; } - auto skipped = HttpInput_.Skip(Min(len, RemainingFrameSize_)); + auto skipped = HttpInput_->Skip(Min(len, RemainingFrameSize_)); RemainingFrameSize_ -= skipped; return skipped; } //////////////////////////////////////////////////////////////////////////////// -THttpRequest::THttpRequest() - : RequestId_(CreateGuidAsString()) -{ } - -THttpRequest::THttpRequest(const TString& requestId) - : RequestId_(requestId) +THttpRequest::THttpRequest(TString requestId, TString hostName, THttpHeader header, TDuration socketTimeout) + : Context_(TRequestContext{ + .RequestId = std::move(requestId), + .HostName = std::move(hostName), + .Method = header.GetUrl(/*needProxy=*/ false) + }) + , Header_(std::move(header)) + , Url_(Header_.GetUrl(true)) + , SocketTimeout_(socketTimeout) { } THttpRequest::~THttpRequest() @@ -905,40 +981,41 @@ THttpRequest::~THttpRequest() // Otherwise next reader might read our remaining data and misinterpret them (YT-6510). TConnectionPool::Get()->Release(Connection_); } else { - TConnectionPool::Get()->Invalidate(HostName_, Connection_); + TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_); } } TString THttpRequest::GetRequestId() const { - return RequestId_; + return Context_.RequestId; } -void THttpRequest::Connect(TString hostName, TDuration socketTimeout) +IOutputStream* THttpRequest::StartRequestImpl(bool includeParameters) { - HostName_ = std::move(hostName); YT_LOG_DEBUG("REQ %v - requesting connection to %v from connection pool", - RequestId_, - HostName_); + Context_.RequestId, + Context_.HostName); StartTime_ = TInstant::Now(); - Connection_ = TConnectionPool::Get()->Connect(HostName_, socketTimeout); + + try { + Connection_ = TConnectionPool::Get()->Connect(Context_.HostName, SocketTimeout_); + } catch (const std::exception& ex) { + auto wrapped = WrapSystemError(Context_, ex); + std::rethrow_exception(wrapped); + } YT_LOG_DEBUG("REQ %v - connection #%v", - RequestId_, + Context_.RequestId, Connection_->Id); -} -IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool includeParameters) -{ - auto strHeader = header.GetHeaderAsString(HostName_, RequestId_, includeParameters); - Url_ = header.GetUrl(true); + auto strHeader = Header_.GetHeaderAsString(Context_.HostName, Context_.RequestId, includeParameters); - LogRequest(header, Url_, includeParameters, RequestId_, HostName_); + LogRequest(Header_, Url_, includeParameters, Context_.RequestId, Context_.HostName); - LoggedAttributes_ = GetLoggedAttributes(header, Url_, includeParameters, 128); + LoggedAttributes_ = GetLoggedAttributes(Header_, Url_, includeParameters, 128); - auto outputFormat = header.GetOutputFormat(); + auto outputFormat = Header_.GetOutputFormat(); if (outputFormat && outputFormat->IsTextYson()) { LogResponse_ = true; } @@ -949,9 +1026,9 @@ IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool in return RequestStream_.Get(); } -IOutputStream* THttpRequest::StartRequest(const THttpHeader& header) +IOutputStream* THttpRequest::StartRequest() { - return StartRequestImpl(header, true); + return StartRequestImpl(true); } void THttpRequest::FinishRequest() @@ -960,16 +1037,16 @@ void THttpRequest::FinishRequest() RequestStream_->Finish(); } -void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body) +void THttpRequest::SmallRequest(TMaybe<TStringBuf> body) { - if (!body && (header.GetMethod() == "PUT" || header.GetMethod() == "POST")) { - const auto& parameters = header.GetParameters(); + if (!body && (Header_.GetMethod() == "PUT" || Header_.GetMethod() == "POST")) { + const auto& parameters = Header_.GetParameters(); auto parametersStr = NodeToYsonString(parameters); - auto* output = StartRequestImpl(header, false); + auto* output = StartRequestImpl(false); output->Write(parametersStr); FinishRequest(); } else { - auto* output = StartRequest(header); + auto* output = StartRequest(); if (body) { output->Write(*body); } @@ -983,9 +1060,9 @@ THttpResponse* THttpRequest::GetResponseStream() SocketInput_.Reset(new TSocketInput(*Connection_->Socket.Get())); if (TConfig::Get()->UseAbortableResponse) { Y_ABORT_UNLESS(!Url_.empty()); - Input_.Reset(new TAbortableHttpResponse(SocketInput_.Get(), RequestId_, HostName_, Url_)); + Input_.Reset(new TAbortableHttpResponse(Context_, SocketInput_.Get(), Url_)); } else { - Input_.Reset(new THttpResponse(SocketInput_.Get(), RequestId_, HostName_)); + Input_.Reset(new THttpResponse(Context_, SocketInput_.Get())); } Input_->CheckErrorResponse(); } @@ -1005,12 +1082,12 @@ TString THttpRequest::GetResponse() if (LogResponse_) { constexpr auto sizeLimit = 1 << 7; YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)", - RequestId_, + Context_.RequestId, TruncateForLogs(result, sizeLimit), loggedAttributes.Str()); } else { YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)", - RequestId_, + Context_.RequestId, result.size(), loggedAttributes.Str()); } @@ -1023,7 +1100,7 @@ int THttpRequest::GetHttpCode() { void THttpRequest::InvalidateConnection() { - TConnectionPool::Get()->Invalidate(HostName_, Connection_); + TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_); Connection_.Reset(); } |