diff options
author | ermolovd <ermolovd@yandex-team.com> | 2024-11-19 10:54:24 +0300 |
---|---|---|
committer | ermolovd <ermolovd@yandex-team.com> | 2024-11-19 11:06:45 +0300 |
commit | 7be946f684e606f5baa9e3a13401cc19b8ac9a3b (patch) | |
tree | a4d6ce2a580ba953a048a64bdedbda1f6c7a1d26 /yt/cpp/mapreduce | |
parent | 907667c6ffd6222845fe8e18fd548e508250b5d0 (diff) | |
download | ydb-7be946f684e606f5baa9e3a13401cc19b8ac9a3b.tar.gz |
YT-22943: add context for system errors
* thrown system errors contain context (i.e. host method and request id that produced error)
Type: feature
Component: cpp-sdk
commit_hash:af72a3a37785e9e373e816c2cc072df2076f821d
Diffstat (limited to 'yt/cpp/mapreduce')
-rw-r--r-- | yt/cpp/mapreduce/http/abortable_http_response.cpp | 13 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/abortable_http_response.h | 3 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/http.cpp | 223 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/http.h | 38 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/http_client.cpp | 12 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/requests.cpp | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/retry_request.cpp | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp | 21 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/ut/http_ut.cpp | 10 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/errors.cpp | 24 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/errors.h | 19 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.cpp | 1 |
12 files changed, 223 insertions, 144 deletions
diff --git a/yt/cpp/mapreduce/http/abortable_http_response.cpp b/yt/cpp/mapreduce/http/abortable_http_response.cpp index 9da9241d33..995bb9de4c 100644 --- a/yt/cpp/mapreduce/http/abortable_http_response.cpp +++ b/yt/cpp/mapreduce/http/abortable_http_response.cpp @@ -14,20 +14,20 @@ public: { auto g = Guard(Lock_); auto id = NextId_++; - IdToOutage.emplace(id, TOutageEntry{std::move(urlPattern), options.ResponseCount_, options.LengthLimit_}); + IdToOutage_.emplace(id, TOutageEntry{std::move(urlPattern), options.ResponseCount_, options.LengthLimit_}); return id; } void StopOutage(TOutageId id) { auto g = Guard(Lock_); - IdToOutage.erase(id); + IdToOutage_.erase(id); } void Add(IAbortableHttpResponse* response) { auto g = Guard(Lock_); - for (auto& [id, entry] : IdToOutage) { + for (auto& [id, entry] : IdToOutage_) { if (entry.Counter > 0 && response->GetUrl().find(entry.Pattern) != TString::npos) { response->SetLengthLimit(entry.LengthLimit); entry.Counter -= 1; @@ -70,7 +70,7 @@ private: private: TOutageId NextId_ = 0; TIntrusiveList<IAbortableHttpResponse> ResponseList_; - THashMap<TOutageId, TOutageEntry> IdToOutage; + THashMap<TOutageId, TOutageEntry> IdToOutage_; TMutex Lock_; }; @@ -137,11 +137,10 @@ bool TAbortableHttpResponseBase::IsAborted() const //////////////////////////////////////////////////////////////////////////////// TAbortableHttpResponse::TAbortableHttpResponse( + TRequestContext context, IInputStream* socketStream, - const TString& requestId, - const TString& hostName, const TString& url) - : THttpResponse(socketStream, requestId, hostName) + : THttpResponse(std::move(context), socketStream) , TAbortableHttpResponseBase(url) { } diff --git a/yt/cpp/mapreduce/http/abortable_http_response.h b/yt/cpp/mapreduce/http/abortable_http_response.h index d72bcfa0a6..e9b1483bf7 100644 --- a/yt/cpp/mapreduce/http/abortable_http_response.h +++ b/yt/cpp/mapreduce/http/abortable_http_response.h @@ -108,9 +108,8 @@ public: public: TAbortableHttpResponse( + TRequestContext context, IInputStream* socketStream, - const TString& requestId, - const TString& hostName, const TString& url); /// @brief Abort any responses which match `urlPattern` (i.e. contain it in url). diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp index 12aa33ff29..ca243a929a 100644 --- a/yt/cpp/mapreduce/http/http.cpp +++ b/yt/cpp/mapreduce/http/http.cpp @@ -10,6 +10,7 @@ #include <yt/cpp/mapreduce/interface/config.h> #include <yt/cpp/mapreduce/interface/errors.h> +#include <yt/cpp/mapreduce/interface/error_codes.h> #include <yt/cpp/mapreduce/interface/logging/yt_log.h> #include <yt/yt/core/http/http.h> @@ -39,6 +40,27 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// +std::exception_ptr WrapSystemError( + const TRequestContext& context, + const std::exception& ex) +{ + if (auto errorResponse = dynamic_cast<const TErrorResponse*>(&ex); errorResponse != nullptr) { + return std::make_exception_ptr(errorResponse); + } + + auto message = NYT::Format("Request %qv to %qv failed", context.RequestId, context.HostName + context.Method); + TYtError outer(1, message, {TYtError(NClusterErrorCodes::Generic, ex.what())}, { + {"request_id", context.RequestId}, + {"host", context.HostName}, + {"method", context.Method}, + }); + TTransportError errorResponse(std::move(outer)); + + return std::make_exception_ptr(errorResponse); +} + +//////////////////////////////////////////////////////////////////////////////// + class THttpRequest::TRequestStream : public IOutputStream { @@ -92,17 +114,17 @@ private: CheckErrorState(); try { func(); - } catch (const std::exception&) { - HandleWriteException(); + } catch (const std::exception& ex) { + HandleWriteException(ex); } } // In many cases http proxy stops reading request and resets connection // if error has happend. This function tries to read error response // in such cases. - void HandleWriteException() { + void HandleWriteException(const std::exception& ex) { Y_ABORT_UNLESS(WriteError_ == nullptr); - WriteError_ = std::current_exception(); + WriteError_ = WrapSystemError(HttpRequest_->Context_, ex); Y_ABORT_UNLESS(WriteError_ != nullptr); try { HttpRequest_->GetResponseStream(); @@ -667,33 +689,81 @@ SOCKET TConnectionPool::DoConnect(TAddressCache::TAddressPtr address) //////////////////////////////////////////////////////////////////////////////// -static TMaybe<TString> GetProxyName(const THttpInput& input) +class THttpResponse::THttpInputWrapped + : public IInputStream { - if (auto proxyHeader = input.Headers().FindHeader("X-YT-Proxy")) { - return proxyHeader->Value(); +public: + explicit THttpInputWrapped(TRequestContext context, IInputStream* input) + : Context_(std::move(context)) + , HttpInput_(input) + { } + + const THttpHeaders& Headers() const noexcept + { + return HttpInput_.Headers(); + } + + const TString& FirstLine() const noexcept + { + return HttpInput_.FirstLine(); } - return Nothing(); -} + + bool IsKeepAlive() const noexcept + { + return HttpInput_.IsKeepAlive(); + } + + const TMaybe<THttpHeaders>& Trailers() const noexcept + { + return HttpInput_.Trailers(); + } + +private: + size_t DoRead(void* buf, size_t len) override + { + try { + return HttpInput_.Read(buf, len); + } catch (const std::exception& ex) { + auto wrapped = WrapSystemError(Context_, ex); + std::rethrow_exception(wrapped); + } + } + + size_t DoSkip(size_t len) override + { + try { + return HttpInput_.Skip(len); + } catch (const std::exception& ex) { + auto wrapped = WrapSystemError(Context_, ex); + std::rethrow_exception(wrapped); + } + } + +private: + const TRequestContext Context_; + THttpInput HttpInput_; +}; THttpResponse::THttpResponse( - IInputStream* socketStream, - const TString& requestId, - const TString& hostName) - : RequestId_(requestId) - , HttpInput_(socketStream) - , HostName_(GetProxyName(HttpInput_).GetOrElse(hostName)) - , Unframe_(HttpInput_.Headers().HasHeader("X-YT-Framing")) + TRequestContext context, + IInputStream* socketStream) + : HttpInput_(MakeHolder<THttpInputWrapped>(context, socketStream)) + , Unframe_(HttpInput_->Headers().HasHeader("X-YT-Framing")) + , Context_(std::move(context)) { - HttpCode_ = ParseHttpRetCode(HttpInput_.FirstLine()); + if (auto proxyHeader = HttpInput_->Headers().FindHeader("X-YT-Proxy")) { + Context_.HostName = proxyHeader->Value(); + } + HttpCode_ = ParseHttpRetCode(HttpInput_->FirstLine()); if (HttpCode_ == 200 || HttpCode_ == 202) { return; } - ErrorResponse_ = TErrorResponse(HttpCode_, RequestId_); + ErrorResponse_ = TErrorResponse(HttpCode_, Context_.RequestId); auto logAndSetError = [&] (const TString& rawError) { YT_LOG_ERROR("RSP %v - HTTP %v - %v", - RequestId_, + Context_.RequestId, HttpCode_, rawError.data()); ErrorResponse_->SetRawError(rawError); @@ -705,26 +775,26 @@ THttpResponse::THttpResponse( break; case 500: - logAndSetError(::TStringBuilder() << "internal error in proxy " << HostName_); + logAndSetError(::TStringBuilder() << "internal error in proxy " << Context_.HostName); break; default: { TStringStream httpHeaders; httpHeaders << "HTTP headers ("; - for (const auto& header : HttpInput_.Headers()) { + for (const auto& header : HttpInput_->Headers()) { httpHeaders << header.Name() << ": " << header.Value() << "; "; } httpHeaders << ")"; auto errorString = Sprintf("RSP %s - HTTP %d - %s", - RequestId_.data(), + Context_.RequestId.data(), HttpCode_, httpHeaders.Str().data()); YT_LOG_ERROR("%v", errorString.data()); - if (auto parsedResponse = ParseError(HttpInput_.Headers())) { + if (auto parsedResponse = ParseError(HttpInput_->Headers())) { ErrorResponse_ = parsedResponse.GetRef(); } else { ErrorResponse_->SetRawError( @@ -735,9 +805,12 @@ THttpResponse::THttpResponse( } } +THttpResponse::~THttpResponse() +{ } + const THttpHeaders& THttpResponse::Headers() const { - return HttpInput_.Headers(); + return HttpInput_->Headers(); } void THttpResponse::CheckErrorResponse() const @@ -759,19 +832,19 @@ int THttpResponse::GetHttpCode() const const TString& THttpResponse::GetHostName() const { - return HostName_; + return Context_.HostName; } bool THttpResponse::IsKeepAlive() const { - return HttpInput_.IsKeepAlive(); + return HttpInput_->IsKeepAlive(); } TMaybe<TErrorResponse> THttpResponse::ParseError(const THttpHeaders& headers) { for (const auto& header : headers) { if (header.Name() == "X-YT-Error") { - TErrorResponse errorResponse(HttpCode_, RequestId_); + TErrorResponse errorResponse(HttpCode_, Context_.RequestId); errorResponse.ParseFromJsonError(header.Value()); if (errorResponse.IsOk()) { return Nothing(); @@ -788,14 +861,14 @@ size_t THttpResponse::DoRead(void* buf, size_t len) if (Unframe_) { read = UnframeRead(buf, len); } else { - read = HttpInput_.Read(buf, len); + read = HttpInput_->Read(buf, len); } if (read == 0 && len != 0) { // THttpInput MUST return defined (but may be empty) // trailers when it is exhausted. - Y_ABORT_UNLESS(HttpInput_.Trailers().Defined(), + Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(), "trailers MUST be defined for exhausted stream"); - CheckTrailers(HttpInput_.Trailers().GetRef()); + CheckTrailers(HttpInput_->Trailers().GetRef()); IsExhausted_ = true; } return read; @@ -807,14 +880,14 @@ size_t THttpResponse::DoSkip(size_t len) if (Unframe_) { skipped = UnframeSkip(len); } else { - skipped = HttpInput_.Skip(len); + skipped = HttpInput_->Skip(len); } if (skipped == 0 && len != 0) { // THttpInput MUST return defined (but may be empty) // trailers when it is exhausted. - Y_ABORT_UNLESS(HttpInput_.Trailers().Defined(), + Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(), "trailers MUST be defined for exhausted stream"); - CheckTrailers(HttpInput_.Trailers().GetRef()); + CheckTrailers(HttpInput_->Trailers().GetRef()); IsExhausted_ = true; } return skipped; @@ -825,13 +898,13 @@ void THttpResponse::CheckTrailers(const THttpHeaders& trailers) if (auto errorResponse = ParseError(trailers)) { errorResponse->SetIsFromTrailers(true); YT_LOG_ERROR("RSP %v - %v", - RequestId_, + Context_.RequestId, errorResponse.GetRef().what()); ythrow errorResponse.GetRef(); } } -static ui32 ReadDataFrameSize(THttpInput* stream) +static ui32 ReadDataFrameSize(IInputStream* stream) { ui32 littleEndianSize; auto read = stream->Load(&littleEndianSize, sizeof(littleEndianSize)); @@ -846,7 +919,7 @@ bool THttpResponse::RefreshFrameIfNecessary() { while (RemainingFrameSize_ == 0) { ui8 frameTypeByte; - auto read = HttpInput_.Read(&frameTypeByte, sizeof(frameTypeByte)); + auto read = HttpInput_->Read(&frameTypeByte, sizeof(frameTypeByte)); if (read == 0) { return false; } @@ -855,7 +928,7 @@ bool THttpResponse::RefreshFrameIfNecessary() case EFrameType::KeepAlive: break; case EFrameType::Data: - RemainingFrameSize_ = ReadDataFrameSize(&HttpInput_); + RemainingFrameSize_ = ReadDataFrameSize(HttpInput_.Get()); break; default: ythrow yexception() << "Bad frame type " << static_cast<int>(frameTypeByte); @@ -869,7 +942,7 @@ size_t THttpResponse::UnframeRead(void* buf, size_t len) if (!RefreshFrameIfNecessary()) { return 0; } - auto read = HttpInput_.Read(buf, Min(len, RemainingFrameSize_)); + auto read = HttpInput_->Read(buf, Min(len, RemainingFrameSize_)); RemainingFrameSize_ -= read; return read; } @@ -879,19 +952,22 @@ size_t THttpResponse::UnframeSkip(size_t len) if (!RefreshFrameIfNecessary()) { return 0; } - auto skipped = HttpInput_.Skip(Min(len, RemainingFrameSize_)); + auto skipped = HttpInput_->Skip(Min(len, RemainingFrameSize_)); RemainingFrameSize_ -= skipped; return skipped; } //////////////////////////////////////////////////////////////////////////////// -THttpRequest::THttpRequest() - : RequestId_(CreateGuidAsString()) -{ } - -THttpRequest::THttpRequest(const TString& requestId) - : RequestId_(requestId) +THttpRequest::THttpRequest(TString requestId, TString hostName, THttpHeader header, TDuration socketTimeout) + : Context_(TRequestContext{ + .RequestId = std::move(requestId), + .HostName = std::move(hostName), + .Method = header.GetUrl(/*needProxy=*/ false) + }) + , Header_(std::move(header)) + , Url_(Header_.GetUrl(true)) + , SocketTimeout_(socketTimeout) { } THttpRequest::~THttpRequest() @@ -905,40 +981,41 @@ THttpRequest::~THttpRequest() // Otherwise next reader might read our remaining data and misinterpret them (YT-6510). TConnectionPool::Get()->Release(Connection_); } else { - TConnectionPool::Get()->Invalidate(HostName_, Connection_); + TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_); } } TString THttpRequest::GetRequestId() const { - return RequestId_; + return Context_.RequestId; } -void THttpRequest::Connect(TString hostName, TDuration socketTimeout) +IOutputStream* THttpRequest::StartRequestImpl(bool includeParameters) { - HostName_ = std::move(hostName); YT_LOG_DEBUG("REQ %v - requesting connection to %v from connection pool", - RequestId_, - HostName_); + Context_.RequestId, + Context_.HostName); StartTime_ = TInstant::Now(); - Connection_ = TConnectionPool::Get()->Connect(HostName_, socketTimeout); + + try { + Connection_ = TConnectionPool::Get()->Connect(Context_.HostName, SocketTimeout_); + } catch (const std::exception& ex) { + auto wrapped = WrapSystemError(Context_, ex); + std::rethrow_exception(wrapped); + } YT_LOG_DEBUG("REQ %v - connection #%v", - RequestId_, + Context_.RequestId, Connection_->Id); -} -IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool includeParameters) -{ - auto strHeader = header.GetHeaderAsString(HostName_, RequestId_, includeParameters); - Url_ = header.GetUrl(true); + auto strHeader = Header_.GetHeaderAsString(Context_.HostName, Context_.RequestId, includeParameters); - LogRequest(header, Url_, includeParameters, RequestId_, HostName_); + LogRequest(Header_, Url_, includeParameters, Context_.RequestId, Context_.HostName); - LoggedAttributes_ = GetLoggedAttributes(header, Url_, includeParameters, 128); + LoggedAttributes_ = GetLoggedAttributes(Header_, Url_, includeParameters, 128); - auto outputFormat = header.GetOutputFormat(); + auto outputFormat = Header_.GetOutputFormat(); if (outputFormat && outputFormat->IsTextYson()) { LogResponse_ = true; } @@ -949,9 +1026,9 @@ IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool in return RequestStream_.Get(); } -IOutputStream* THttpRequest::StartRequest(const THttpHeader& header) +IOutputStream* THttpRequest::StartRequest() { - return StartRequestImpl(header, true); + return StartRequestImpl(true); } void THttpRequest::FinishRequest() @@ -960,16 +1037,16 @@ void THttpRequest::FinishRequest() RequestStream_->Finish(); } -void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body) +void THttpRequest::SmallRequest(TMaybe<TStringBuf> body) { - if (!body && (header.GetMethod() == "PUT" || header.GetMethod() == "POST")) { - const auto& parameters = header.GetParameters(); + if (!body && (Header_.GetMethod() == "PUT" || Header_.GetMethod() == "POST")) { + const auto& parameters = Header_.GetParameters(); auto parametersStr = NodeToYsonString(parameters); - auto* output = StartRequestImpl(header, false); + auto* output = StartRequestImpl(false); output->Write(parametersStr); FinishRequest(); } else { - auto* output = StartRequest(header); + auto* output = StartRequest(); if (body) { output->Write(*body); } @@ -983,9 +1060,9 @@ THttpResponse* THttpRequest::GetResponseStream() SocketInput_.Reset(new TSocketInput(*Connection_->Socket.Get())); if (TConfig::Get()->UseAbortableResponse) { Y_ABORT_UNLESS(!Url_.empty()); - Input_.Reset(new TAbortableHttpResponse(SocketInput_.Get(), RequestId_, HostName_, Url_)); + Input_.Reset(new TAbortableHttpResponse(Context_, SocketInput_.Get(), Url_)); } else { - Input_.Reset(new THttpResponse(SocketInput_.Get(), RequestId_, HostName_)); + Input_.Reset(new THttpResponse(Context_, SocketInput_.Get())); } Input_->CheckErrorResponse(); } @@ -1005,12 +1082,12 @@ TString THttpRequest::GetResponse() if (LogResponse_) { constexpr auto sizeLimit = 1 << 7; YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)", - RequestId_, + Context_.RequestId, TruncateForLogs(result, sizeLimit), loggedAttributes.Str()); } else { YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)", - RequestId_, + Context_.RequestId, result.size(), loggedAttributes.Str()); } @@ -1023,7 +1100,7 @@ int THttpRequest::GetHttpCode() { void THttpRequest::InvalidateConnection() { - TConnectionPool::Get()->Invalidate(HostName_, Connection_); + TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_); Connection_.Reset(); } diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h index 013b40aecb..618b1e2c22 100644 --- a/yt/cpp/mapreduce/http/http.h +++ b/yt/cpp/mapreduce/http/http.h @@ -39,6 +39,12 @@ enum class EFrameType KeepAlive = 0x02, }; +struct TRequestContext +{ + TString RequestId; + TString HostName; + TString Method; +}; class THttpHeader { @@ -171,9 +177,10 @@ public: // 'requestId' and 'hostName' are provided for debug reasons // (they will appear in some error messages). THttpResponse( - IInputStream* socketStream, - const TString& requestId, - const TString& hostName); + TRequestContext context, + IInputStream* socketStream); + + ~THttpResponse(); const THttpHeaders& Headers() const; @@ -195,13 +202,14 @@ private: bool RefreshFrameIfNecessary(); private: - const TString RequestId_; + class THttpInputWrapped; - THttpInput HttpInput_; +private: + THolder<THttpInputWrapped> HttpInput_; - const TString HostName_; const bool Unframe_; + TRequestContext Context_; int HttpCode_ = 0; TMaybe<TErrorResponse> ErrorResponse_; bool IsExhausted_ = false; @@ -213,18 +221,15 @@ private: class THttpRequest { public: - THttpRequest(); - THttpRequest(const TString& requestId); + THttpRequest(TString requestId, TString hostName, THttpHeader header, TDuration socketTimeout); ~THttpRequest(); TString GetRequestId() const; - void Connect(TString hostName, TDuration socketTimeout = TDuration::Zero()); - - IOutputStream* StartRequest(const THttpHeader& header); + IOutputStream* StartRequest(); void FinishRequest(); - void SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body); + void SmallRequest(TMaybe<TStringBuf> body); THttpResponse* GetResponseStream(); @@ -235,16 +240,17 @@ public: int GetHttpCode(); private: - IOutputStream* StartRequestImpl(const THttpHeader& header, bool includeParameters); + IOutputStream* StartRequestImpl(bool includeParameters); private: class TRequestStream; private: - const TString RequestId_; + const TRequestContext Context_; + const THttpHeader Header_; + const TString Url_; + const TDuration SocketTimeout_; - TString HostName_; - TString Url_; TInstant StartTime_; TString LoggedAttributes_; diff --git a/yt/cpp/mapreduce/http/http_client.cpp b/yt/cpp/mapreduce/http/http_client.cpp index 97bf4a7703..7e9d761c3c 100644 --- a/yt/cpp/mapreduce/http/http_client.cpp +++ b/yt/cpp/mapreduce/http/http_client.cpp @@ -167,13 +167,12 @@ class TDefaultHttpClient public: IHttpResponsePtr Request(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header, TMaybe<TStringBuf> body) override { - auto request = std::make_unique<THttpRequest>(requestId); - auto urlRef = NHttp::ParseUrl(url); auto host = CreateHost(urlRef.Host, urlRef.PortStr); - request->Connect(host, config.SocketTimeout); - request->SmallRequest(header, body); + auto request = std::make_unique<THttpRequest>(requestId, host, header, config.SocketTimeout); + + request->SmallRequest(body); return std::make_unique<TDefaultHttpResponse>(std::move(request)); } @@ -182,10 +181,9 @@ public: auto urlRef = NHttp::ParseUrl(url); auto host = CreateHost(urlRef.Host, urlRef.PortStr); - auto request = std::make_unique<THttpRequest>(requestId); + auto request = std::make_unique<THttpRequest>(requestId, host, header, config.SocketTimeout); - request->Connect(host, config.SocketTimeout); - auto stream = request->StartRequest(header); + auto stream = request->StartRequest(); return std::make_unique<TDefaultHttpRequest>(std::move(request), stream); } }; diff --git a/yt/cpp/mapreduce/http/requests.cpp b/yt/cpp/mapreduce/http/requests.cpp index 7d95a10bc2..cd610d6493 100644 --- a/yt/cpp/mapreduce/http/requests.cpp +++ b/yt/cpp/mapreduce/http/requests.cpp @@ -2,7 +2,6 @@ #include "context.h" #include "host_manager.h" -#include "retry_request.h" #include <yt/cpp/mapreduce/client/transaction.h> diff --git a/yt/cpp/mapreduce/http/retry_request.cpp b/yt/cpp/mapreduce/http/retry_request.cpp index 307e310b5b..0c719eeda3 100644 --- a/yt/cpp/mapreduce/http/retry_request.cpp +++ b/yt/cpp/mapreduce/http/retry_request.cpp @@ -114,7 +114,7 @@ TResponseInfo RetryRequestWithPolicy( return Request(context, header, body, requestId, config); } catch (const TErrorResponse& e) { - LogRequestError(requestId, header, e.GetError().GetMessage(), retryPolicy->GetAttemptDescription()); + LogRequestError(requestId, header, e.what(), retryPolicy->GetAttemptDescription()); retryWithSameMutationId = e.IsTransportError(); if (!IsRetriable(e)) { diff --git a/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp index 90196246c5..fa072675fb 100644 --- a/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp +++ b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp @@ -72,11 +72,10 @@ THolder<TSimpleServer> CreateProxyHttpServer() ui64 port; ParseFirstLine(inputStr, method, host, port, command); - THttpRequest request; const TString hostName = ::TStringBuilder() << host << ":" << port; - request.Connect(hostName); auto header = THttpHeader(method, command); - request.StartRequest(header); + THttpRequest request("0-0-0-0", hostName, header, TDuration::Zero()); + request.StartRequest(); request.FinishRequest(); auto res = request.GetResponseStream(); THttpOutput httpOutput(output); @@ -138,9 +137,8 @@ TEST(TConnectionPool, TestReleaseUnread) const TString hostName = ::TStringBuilder() << "localhost:" << simpleServer->GetPort(); for (size_t i = 0; i != 10; ++i) { - THttpRequest request; - request.Connect(hostName); - request.StartRequest(THttpHeader("GET", "foo")); + THttpRequest request("0-0-0-0", hostName, THttpHeader("GET", "foo"), TDuration::Zero()); + request.StartRequest(); request.FinishRequest(); request.GetResponseStream(); } @@ -155,12 +153,12 @@ TEST(TConnectionPool, TestProxy) const TString hostName2 = ::TStringBuilder() << "localhost:" << simpleServer2->GetPort(); for (size_t i = 0; i != 10; ++i) { - THttpRequest request; - request.Connect(hostName2); auto header = THttpHeader("GET", "foo"); header.SetProxyAddress(hostName2); header.SetHostPort(hostName); - request.StartRequest(header); + + THttpRequest request("0-0-0-0", hostName2, header, TDuration::Zero()); + request.StartRequest(); request.FinishRequest(); request.GetResponseStream(); } @@ -176,9 +174,8 @@ TEST(TConnectionPool, TestConcurrency) const auto func = [&] { for (int i = 0; i != 100; ++i) { - THttpRequest request; - request.Connect(hostName); - request.StartRequest(THttpHeader("GET", "foo")); + THttpRequest request("0-0-0-0", hostName, THttpHeader("GET", "foo"), TDuration::Zero()); + request.StartRequest(); request.FinishRequest(); auto res = request.GetResponseStream(); res->ReadAll(); diff --git a/yt/cpp/mapreduce/http/ut/http_ut.cpp b/yt/cpp/mapreduce/http/ut/http_ut.cpp index ca260841d0..e41e83c5a0 100644 --- a/yt/cpp/mapreduce/http/ut/http_ut.cpp +++ b/yt/cpp/mapreduce/http/ut/http_ut.cpp @@ -70,9 +70,8 @@ TEST(TFramingTest, FramingSimple) { auto server = CreateFramingEchoServer(); - THttpRequest request; - request.Connect(server->GetAddress()); - auto requestStream = request.StartRequest(THttpHeader("POST", "concatenate")); + THttpRequest request("0-0-0-0", server->GetAddress(), THttpHeader("POST", "concatenate"), TDuration::Zero()); + auto requestStream = request.StartRequest(); *requestStream << "Some funny data"; request.FinishRequest(); auto response = request.GetResponseStream()->ReadAll(); @@ -83,9 +82,8 @@ TEST(TFramingTest, FramingLarge) { auto server = CreateFramingEchoServer(); - THttpRequest request; - request.Connect(server->GetAddress()); - auto requestStream = request.StartRequest(THttpHeader("POST", "concatenate")); + THttpRequest request("0-0-0-0", server->GetAddress(), THttpHeader("POST", "concatenate"), TDuration::Zero()); + auto requestStream = request.StartRequest(); auto data = TString(100000, 'x'); *requestStream << data; request.FinishRequest(); diff --git a/yt/cpp/mapreduce/interface/errors.cpp b/yt/cpp/mapreduce/interface/errors.cpp index ef3d2db4a3..0819c8ca56 100644 --- a/yt/cpp/mapreduce/interface/errors.cpp +++ b/yt/cpp/mapreduce/interface/errors.cpp @@ -20,20 +20,11 @@ using namespace NJson; static void WriteErrorDescription(const TYtError& error, IOutputStream* out) { - (*out) << '\'' << error.GetMessage() << '\''; + (*out) << error.GetMessage(); const auto& innerErrorList = error.InnerErrors(); if (!innerErrorList.empty()) { - (*out) << " { "; - bool first = true; - for (const auto& innerError : innerErrorList) { - if (first) { - first = false; - } else { - (*out) << " ; "; - } - WriteErrorDescription(innerError, out); - } - (*out) << " }"; + (*out) << ": "; + WriteErrorDescription(innerErrorList[0], out); } } @@ -118,9 +109,11 @@ TYtError::TYtError(const TString& message) , Message_(message) { } -TYtError::TYtError(int code, const TString& message) +TYtError::TYtError(int code, TString message, TVector<TYtError> innerError, TNode::TMapType attributes) : Code_(code) , Message_(message) + , InnerErrors_(innerError) + , Attributes_(attributes) { } TYtError::TYtError(const TJsonValue& value) @@ -396,6 +389,11 @@ void TErrorResponse::Setup() *this << Error_.FullDescription(); } +TTransportError::TTransportError(TYtError error) +{ + *this << error.FullDescription(); +} + //////////////////////////////////////////////////////////////////////////////// TOperationFailedError::TOperationFailedError( diff --git a/yt/cpp/mapreduce/interface/errors.h b/yt/cpp/mapreduce/interface/errors.h index afad58ed72..1311dbcf3d 100644 --- a/yt/cpp/mapreduce/interface/errors.h +++ b/yt/cpp/mapreduce/interface/errors.h @@ -6,14 +6,14 @@ /// Errors and exceptions emitted by library. #include "fwd.h" -#include "common.h" #include <library/cpp/yson/node/node.h> #include <util/generic/bt_exception.h> -#include <util/generic/yexception.h> +#include <util/generic/guid.h> #include <util/generic/string.h> #include <util/generic/vector.h> +#include <util/generic/yexception.h> namespace NJson { class TJsonValue; @@ -67,8 +67,8 @@ public: /// Constructs error with NYT::NClusterErrorCodes::Generic code and given message. explicit TYtError(const TString& message); - /// Constructs error with given code and given message. - TYtError(int code, const TString& message); + /// Constructs error from given parameters. + TYtError(int code, TString message, TVector<TYtError> inner = {}, TNode::TMapType attributes = {}); /// Construct error from json representation. TYtError(const ::NJson::TJsonValue& value); @@ -158,7 +158,6 @@ class TErrorResponse { public: TErrorResponse(int httpCode, const TString& requestId); - TErrorResponse(int httpCode, TYtError error); /// Get error object returned by server. const TYtError& GetError() const; @@ -222,6 +221,16 @@ private: //////////////////////////////////////////////////////////////////////////////// +/// @brief System error indicating that response from server cannot be received +class TTransportError + : public yexception +{ +public: + explicit TTransportError(TYtError error); +}; + +//////////////////////////////////////////////////////////////////////////////// + /// Info about failed jobs. /// /// @see NYT::TOperationFailedError diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp index 59868c599e..98a8aa0792 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -728,7 +728,6 @@ private: } private: - THttpRequest Request_; NHttpClient::IHttpResponsePtr Response_; IInputStream* ResponseStream_; }; |