diff options
author | Alexander Smirnov <alex@ydb.tech> | 2024-11-20 11:14:58 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2024-11-20 11:14:58 +0000 |
commit | 31773f157bf8164364649b5f470f52dece0a4317 (patch) | |
tree | 33d0f7eef45303ab68cf08ab381ce5e5e36c5240 /yt/cpp/mapreduce/http | |
parent | 2c7938962d8689e175574fc1e817c05049f27905 (diff) | |
parent | eff600952d5dfe17942f38f510a8ac2b203bb3a5 (diff) | |
download | ydb-31773f157bf8164364649b5f470f52dece0a4317.tar.gz |
Merge branch 'rightlib' into mergelibs-241120-1113
Diffstat (limited to 'yt/cpp/mapreduce/http')
-rw-r--r-- | yt/cpp/mapreduce/http/abortable_http_response.cpp | 13 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/abortable_http_response.h | 3 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/http.cpp | 340 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/http.h | 87 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/http_client.cpp | 16 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/http_client.h | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/requests.cpp | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/retry_request.cpp | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp | 21 | ||||
-rw-r--r-- | yt/cpp/mapreduce/http/ut/http_ut.cpp | 10 |
10 files changed, 285 insertions, 210 deletions
diff --git a/yt/cpp/mapreduce/http/abortable_http_response.cpp b/yt/cpp/mapreduce/http/abortable_http_response.cpp index 9da9241d33..995bb9de4c 100644 --- a/yt/cpp/mapreduce/http/abortable_http_response.cpp +++ b/yt/cpp/mapreduce/http/abortable_http_response.cpp @@ -14,20 +14,20 @@ public: { auto g = Guard(Lock_); auto id = NextId_++; - IdToOutage.emplace(id, TOutageEntry{std::move(urlPattern), options.ResponseCount_, options.LengthLimit_}); + IdToOutage_.emplace(id, TOutageEntry{std::move(urlPattern), options.ResponseCount_, options.LengthLimit_}); return id; } void StopOutage(TOutageId id) { auto g = Guard(Lock_); - IdToOutage.erase(id); + IdToOutage_.erase(id); } void Add(IAbortableHttpResponse* response) { auto g = Guard(Lock_); - for (auto& [id, entry] : IdToOutage) { + for (auto& [id, entry] : IdToOutage_) { if (entry.Counter > 0 && response->GetUrl().find(entry.Pattern) != TString::npos) { response->SetLengthLimit(entry.LengthLimit); entry.Counter -= 1; @@ -70,7 +70,7 @@ private: private: TOutageId NextId_ = 0; TIntrusiveList<IAbortableHttpResponse> ResponseList_; - THashMap<TOutageId, TOutageEntry> IdToOutage; + THashMap<TOutageId, TOutageEntry> IdToOutage_; TMutex Lock_; }; @@ -137,11 +137,10 @@ bool TAbortableHttpResponseBase::IsAborted() const //////////////////////////////////////////////////////////////////////////////// TAbortableHttpResponse::TAbortableHttpResponse( + TRequestContext context, IInputStream* socketStream, - const TString& requestId, - const TString& hostName, const TString& url) - : THttpResponse(socketStream, requestId, hostName) + : THttpResponse(std::move(context), socketStream) , TAbortableHttpResponseBase(url) { } diff --git a/yt/cpp/mapreduce/http/abortable_http_response.h b/yt/cpp/mapreduce/http/abortable_http_response.h index d72bcfa0a6..e9b1483bf7 100644 --- a/yt/cpp/mapreduce/http/abortable_http_response.h +++ b/yt/cpp/mapreduce/http/abortable_http_response.h @@ -108,9 +108,8 @@ public: public: TAbortableHttpResponse( + TRequestContext context, IInputStream* socketStream, - const TString& requestId, - const TString& hostName, const TString& url); /// @brief Abort any responses which match `urlPattern` (i.e. contain it in url). diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp index f9eb8539b5..ca243a929a 100644 --- a/yt/cpp/mapreduce/http/http.cpp +++ b/yt/cpp/mapreduce/http/http.cpp @@ -10,6 +10,7 @@ #include <yt/cpp/mapreduce/interface/config.h> #include <yt/cpp/mapreduce/interface/errors.h> +#include <yt/cpp/mapreduce/interface/error_codes.h> #include <yt/cpp/mapreduce/interface/logging/yt_log.h> #include <yt/yt/core/http/http.h> @@ -39,6 +40,27 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// +std::exception_ptr WrapSystemError( + const TRequestContext& context, + const std::exception& ex) +{ + if (auto errorResponse = dynamic_cast<const TErrorResponse*>(&ex); errorResponse != nullptr) { + return std::make_exception_ptr(errorResponse); + } + + auto message = NYT::Format("Request %qv to %qv failed", context.RequestId, context.HostName + context.Method); + TYtError outer(1, message, {TYtError(NClusterErrorCodes::Generic, ex.what())}, { + {"request_id", context.RequestId}, + {"host", context.HostName}, + {"method", context.Method}, + }); + TTransportError errorResponse(std::move(outer)); + + return std::make_exception_ptr(errorResponse); +} + +//////////////////////////////////////////////////////////////////////////////// + class THttpRequest::TRequestStream : public IOutputStream { @@ -92,17 +114,17 @@ private: CheckErrorState(); try { func(); - } catch (const std::exception&) { - HandleWriteException(); + } catch (const std::exception& ex) { + HandleWriteException(ex); } } // In many cases http proxy stops reading request and resets connection // if error has happend. This function tries to read error response // in such cases. - void HandleWriteException() { + void HandleWriteException(const std::exception& ex) { Y_ABORT_UNLESS(WriteError_ == nullptr); - WriteError_ = std::current_exception(); + WriteError_ = WrapSystemError(HttpRequest_->Context_, ex); Y_ABORT_UNLESS(WriteError_ != nullptr); try { HttpRequest_->GetResponseStream(); @@ -130,16 +152,16 @@ private: //////////////////////////////////////////////////////////////////////////////// THttpHeader::THttpHeader(const TString& method, const TString& command, bool isApi) - : Method(method) - , Command(command) - , IsApi(isApi) + : Method_(method) + , Command_(command) + , IsApi_(isApi) { } void THttpHeader::AddParameter(const TString& key, TNode value, bool overwrite) { - auto it = Parameters.find(key); - if (it == Parameters.end()) { - Parameters.emplace(key, std::move(value)); + auto it = Parameters_.find(key); + if (it == Parameters_.end()) { + Parameters_.emplace(key, std::move(value)); } else { if (overwrite) { it->second = std::move(value); @@ -158,12 +180,12 @@ void THttpHeader::MergeParameters(const TNode& newParameters, bool overwrite) void THttpHeader::RemoveParameter(const TString& key) { - Parameters.erase(key); + Parameters_.erase(key); } TNode THttpHeader::GetParameters() const { - return Parameters; + return Parameters_; } void THttpHeader::AddTransactionId(const TTransactionId& transactionId, bool overwrite) @@ -202,81 +224,81 @@ void THttpHeader::AddMutationId() bool THttpHeader::HasMutationId() const { - return Parameters.contains("mutation_id"); + return Parameters_.contains("mutation_id"); } void THttpHeader::SetToken(const TString& token) { - Token = token; + Token_ = token; } void THttpHeader::SetProxyAddress(const TString& proxyAddress) { - ProxyAddress = proxyAddress; + ProxyAddress_ = proxyAddress; } void THttpHeader::SetHostPort(const TString& hostPort) { - HostPort = hostPort; + HostPort_ = hostPort; } void THttpHeader::SetImpersonationUser(const TString& impersonationUser) { - ImpersonationUser = impersonationUser; + ImpersonationUser_ = impersonationUser; } void THttpHeader::SetServiceTicket(const TString& ticket) { - ServiceTicket = ticket; + ServiceTicket_ = ticket; } void THttpHeader::SetInputFormat(const TMaybe<TFormat>& format) { - InputFormat = format; + InputFormat_ = format; } void THttpHeader::SetOutputFormat(const TMaybe<TFormat>& format) { - OutputFormat = format; + OutputFormat_ = format; } TMaybe<TFormat> THttpHeader::GetOutputFormat() const { - return OutputFormat; + return OutputFormat_; } void THttpHeader::SetRequestCompression(const TString& compression) { - RequestCompression = compression; + RequestCompression_ = compression; } void THttpHeader::SetResponseCompression(const TString& compression) { - ResponseCompression = compression; + ResponseCompression_ = compression; } TString THttpHeader::GetCommand() const { - return Command; + return Command_; } TString THttpHeader::GetUrl(bool needProxy) const { TStringStream url; - if (needProxy && !ProxyAddress.empty()) { - url << ProxyAddress << "/"; + if (needProxy && !ProxyAddress_.empty()) { + url << ProxyAddress_ << "/"; return url.Str(); } - if (!ProxyAddress.empty()) { - url << HostPort; + if (!ProxyAddress_.empty()) { + url << HostPort_; } - if (IsApi) { - url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command; + if (IsApi_) { + url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command_; } else { - url << "/" << Command; + url << "/" << Command_; } return url.Str(); @@ -284,16 +306,16 @@ TString THttpHeader::GetUrl(bool needProxy) const bool THttpHeader::ShouldAcceptFraming() const { - return TConfig::Get()->CommandsWithFraming.contains(Command); + return TConfig::Get()->CommandsWithFraming.contains(Command_); } TString THttpHeader::GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters) const { TStringStream result; - result << Method << " " << GetUrl() << " HTTP/1.1\r\n"; + result << Method_ << " " << GetUrl() << " HTTP/1.1\r\n"; - GetHeader(HostPort.empty() ? hostName : HostPort, requestId, includeParameters).Get()->WriteTo(&result); + GetHeader(HostPort_.empty() ? hostName : HostPort_, requestId, includeParameters).Get()->WriteTo(&result); if (ShouldAcceptFraming()) { result << "X-YT-Accept-Framing: 1\r\n"; @@ -311,25 +333,25 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const headers->Add("Host", hostName); headers->Add("User-Agent", TProcessState::Get()->ClientVersion); - if (!Token.empty()) { - headers->Add("Authorization", "OAuth " + Token); + if (!Token_.empty()) { + headers->Add("Authorization", "OAuth " + Token_); } - if (!ServiceTicket.empty()) { - headers->Add("X-Ya-Service-Ticket", ServiceTicket); + if (!ServiceTicket_.empty()) { + headers->Add("X-Ya-Service-Ticket", ServiceTicket_); } - if (!ImpersonationUser.empty()) { - headers->Add("X-Yt-User-Name", ImpersonationUser); + if (!ImpersonationUser_.empty()) { + headers->Add("X-Yt-User-Name", ImpersonationUser_); } - if (Method == "PUT" || Method == "POST") { + if (Method_ == "PUT" || Method_ == "POST") { headers->Add("Transfer-Encoding", "chunked"); } headers->Add("X-YT-Correlation-Id", requestId); headers->Add("X-YT-Header-Format", "<format=text>yson"); - headers->Add("Content-Encoding", RequestCompression); - headers->Add("Accept-Encoding", ResponseCompression); + headers->Add("Content-Encoding", RequestCompression_); + headers->Add("Accept-Encoding", ResponseCompression_); auto printYTHeader = [&headers] (const char* headerName, const TString& value) { static const size_t maxHttpHeaderSize = 64 << 10; @@ -353,14 +375,14 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const } while (ptr != finish); }; - if (InputFormat) { - printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat->Config)); + if (InputFormat_) { + printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat_->Config)); } - if (OutputFormat) { - printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat->Config)); + if (OutputFormat_) { + printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat_->Config)); } if (includeParameters) { - printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters)); + printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters_)); } return NHttp::THeadersPtrWrapper(std::move(headers)); @@ -368,7 +390,7 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const const TString& THttpHeader::GetMethod() const { - return Method; + return Method_; } //////////////////////////////////////////////////////////////////////////////// @@ -667,33 +689,81 @@ SOCKET TConnectionPool::DoConnect(TAddressCache::TAddressPtr address) //////////////////////////////////////////////////////////////////////////////// -static TMaybe<TString> GetProxyName(const THttpInput& input) +class THttpResponse::THttpInputWrapped + : public IInputStream { - if (auto proxyHeader = input.Headers().FindHeader("X-YT-Proxy")) { - return proxyHeader->Value(); +public: + explicit THttpInputWrapped(TRequestContext context, IInputStream* input) + : Context_(std::move(context)) + , HttpInput_(input) + { } + + const THttpHeaders& Headers() const noexcept + { + return HttpInput_.Headers(); + } + + const TString& FirstLine() const noexcept + { + return HttpInput_.FirstLine(); + } + + bool IsKeepAlive() const noexcept + { + return HttpInput_.IsKeepAlive(); + } + + const TMaybe<THttpHeaders>& Trailers() const noexcept + { + return HttpInput_.Trailers(); } - return Nothing(); -} + +private: + size_t DoRead(void* buf, size_t len) override + { + try { + return HttpInput_.Read(buf, len); + } catch (const std::exception& ex) { + auto wrapped = WrapSystemError(Context_, ex); + std::rethrow_exception(wrapped); + } + } + + size_t DoSkip(size_t len) override + { + try { + return HttpInput_.Skip(len); + } catch (const std::exception& ex) { + auto wrapped = WrapSystemError(Context_, ex); + std::rethrow_exception(wrapped); + } + } + +private: + const TRequestContext Context_; + THttpInput HttpInput_; +}; THttpResponse::THttpResponse( - IInputStream* socketStream, - const TString& requestId, - const TString& hostName) - : HttpInput_(socketStream) - , RequestId_(requestId) - , HostName_(GetProxyName(HttpInput_).GetOrElse(hostName)) - , Unframe_(HttpInput_.Headers().HasHeader("X-YT-Framing")) -{ - HttpCode_ = ParseHttpRetCode(HttpInput_.FirstLine()); + TRequestContext context, + IInputStream* socketStream) + : HttpInput_(MakeHolder<THttpInputWrapped>(context, socketStream)) + , Unframe_(HttpInput_->Headers().HasHeader("X-YT-Framing")) + , Context_(std::move(context)) +{ + if (auto proxyHeader = HttpInput_->Headers().FindHeader("X-YT-Proxy")) { + Context_.HostName = proxyHeader->Value(); + } + HttpCode_ = ParseHttpRetCode(HttpInput_->FirstLine()); if (HttpCode_ == 200 || HttpCode_ == 202) { return; } - ErrorResponse_ = TErrorResponse(HttpCode_, RequestId_); + ErrorResponse_ = TErrorResponse(HttpCode_, Context_.RequestId); auto logAndSetError = [&] (const TString& rawError) { YT_LOG_ERROR("RSP %v - HTTP %v - %v", - RequestId_, + Context_.RequestId, HttpCode_, rawError.data()); ErrorResponse_->SetRawError(rawError); @@ -705,26 +775,26 @@ THttpResponse::THttpResponse( break; case 500: - logAndSetError(::TStringBuilder() << "internal error in proxy " << HostName_); + logAndSetError(::TStringBuilder() << "internal error in proxy " << Context_.HostName); break; default: { TStringStream httpHeaders; httpHeaders << "HTTP headers ("; - for (const auto& header : HttpInput_.Headers()) { + for (const auto& header : HttpInput_->Headers()) { httpHeaders << header.Name() << ": " << header.Value() << "; "; } httpHeaders << ")"; auto errorString = Sprintf("RSP %s - HTTP %d - %s", - RequestId_.data(), + Context_.RequestId.data(), HttpCode_, httpHeaders.Str().data()); YT_LOG_ERROR("%v", errorString.data()); - if (auto parsedResponse = ParseError(HttpInput_.Headers())) { + if (auto parsedResponse = ParseError(HttpInput_->Headers())) { ErrorResponse_ = parsedResponse.GetRef(); } else { ErrorResponse_->SetRawError( @@ -735,9 +805,12 @@ THttpResponse::THttpResponse( } } +THttpResponse::~THttpResponse() +{ } + const THttpHeaders& THttpResponse::Headers() const { - return HttpInput_.Headers(); + return HttpInput_->Headers(); } void THttpResponse::CheckErrorResponse() const @@ -759,19 +832,19 @@ int THttpResponse::GetHttpCode() const const TString& THttpResponse::GetHostName() const { - return HostName_; + return Context_.HostName; } bool THttpResponse::IsKeepAlive() const { - return HttpInput_.IsKeepAlive(); + return HttpInput_->IsKeepAlive(); } TMaybe<TErrorResponse> THttpResponse::ParseError(const THttpHeaders& headers) { for (const auto& header : headers) { if (header.Name() == "X-YT-Error") { - TErrorResponse errorResponse(HttpCode_, RequestId_); + TErrorResponse errorResponse(HttpCode_, Context_.RequestId); errorResponse.ParseFromJsonError(header.Value()); if (errorResponse.IsOk()) { return Nothing(); @@ -788,14 +861,14 @@ size_t THttpResponse::DoRead(void* buf, size_t len) if (Unframe_) { read = UnframeRead(buf, len); } else { - read = HttpInput_.Read(buf, len); + read = HttpInput_->Read(buf, len); } if (read == 0 && len != 0) { // THttpInput MUST return defined (but may be empty) // trailers when it is exhausted. - Y_ABORT_UNLESS(HttpInput_.Trailers().Defined(), + Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(), "trailers MUST be defined for exhausted stream"); - CheckTrailers(HttpInput_.Trailers().GetRef()); + CheckTrailers(HttpInput_->Trailers().GetRef()); IsExhausted_ = true; } return read; @@ -807,14 +880,14 @@ size_t THttpResponse::DoSkip(size_t len) if (Unframe_) { skipped = UnframeSkip(len); } else { - skipped = HttpInput_.Skip(len); + skipped = HttpInput_->Skip(len); } if (skipped == 0 && len != 0) { // THttpInput MUST return defined (but may be empty) // trailers when it is exhausted. - Y_ABORT_UNLESS(HttpInput_.Trailers().Defined(), + Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(), "trailers MUST be defined for exhausted stream"); - CheckTrailers(HttpInput_.Trailers().GetRef()); + CheckTrailers(HttpInput_->Trailers().GetRef()); IsExhausted_ = true; } return skipped; @@ -825,13 +898,13 @@ void THttpResponse::CheckTrailers(const THttpHeaders& trailers) if (auto errorResponse = ParseError(trailers)) { errorResponse->SetIsFromTrailers(true); YT_LOG_ERROR("RSP %v - %v", - RequestId_, + Context_.RequestId, errorResponse.GetRef().what()); ythrow errorResponse.GetRef(); } } -static ui32 ReadDataFrameSize(THttpInput* stream) +static ui32 ReadDataFrameSize(IInputStream* stream) { ui32 littleEndianSize; auto read = stream->Load(&littleEndianSize, sizeof(littleEndianSize)); @@ -846,7 +919,7 @@ bool THttpResponse::RefreshFrameIfNecessary() { while (RemainingFrameSize_ == 0) { ui8 frameTypeByte; - auto read = HttpInput_.Read(&frameTypeByte, sizeof(frameTypeByte)); + auto read = HttpInput_->Read(&frameTypeByte, sizeof(frameTypeByte)); if (read == 0) { return false; } @@ -855,7 +928,7 @@ bool THttpResponse::RefreshFrameIfNecessary() case EFrameType::KeepAlive: break; case EFrameType::Data: - RemainingFrameSize_ = ReadDataFrameSize(&HttpInput_); + RemainingFrameSize_ = ReadDataFrameSize(HttpInput_.Get()); break; default: ythrow yexception() << "Bad frame type " << static_cast<int>(frameTypeByte); @@ -869,7 +942,7 @@ size_t THttpResponse::UnframeRead(void* buf, size_t len) if (!RefreshFrameIfNecessary()) { return 0; } - auto read = HttpInput_.Read(buf, Min(len, RemainingFrameSize_)); + auto read = HttpInput_->Read(buf, Min(len, RemainingFrameSize_)); RemainingFrameSize_ -= read; return read; } @@ -879,80 +952,83 @@ size_t THttpResponse::UnframeSkip(size_t len) if (!RefreshFrameIfNecessary()) { return 0; } - auto skipped = HttpInput_.Skip(Min(len, RemainingFrameSize_)); + auto skipped = HttpInput_->Skip(Min(len, RemainingFrameSize_)); RemainingFrameSize_ -= skipped; return skipped; } //////////////////////////////////////////////////////////////////////////////// -THttpRequest::THttpRequest() -{ - RequestId = CreateGuidAsString(); -} - -THttpRequest::THttpRequest(const TString& requestId) - : RequestId(requestId) +THttpRequest::THttpRequest(TString requestId, TString hostName, THttpHeader header, TDuration socketTimeout) + : Context_(TRequestContext{ + .RequestId = std::move(requestId), + .HostName = std::move(hostName), + .Method = header.GetUrl(/*needProxy=*/ false) + }) + , Header_(std::move(header)) + , Url_(Header_.GetUrl(true)) + , SocketTimeout_(socketTimeout) { } THttpRequest::~THttpRequest() { - if (!Connection) { + if (!Connection_) { return; } - if (Input && Input->IsKeepAlive() && Input->IsExhausted()) { + if (Input_ && Input_->IsKeepAlive() && Input_->IsExhausted()) { // We should return to the pool only connections where HTTP response was fully read. // Otherwise next reader might read our remaining data and misinterpret them (YT-6510). - TConnectionPool::Get()->Release(Connection); + TConnectionPool::Get()->Release(Connection_); } else { - TConnectionPool::Get()->Invalidate(HostName, Connection); + TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_); } } TString THttpRequest::GetRequestId() const { - return RequestId; + return Context_.RequestId; } -void THttpRequest::Connect(TString hostName, TDuration socketTimeout) +IOutputStream* THttpRequest::StartRequestImpl(bool includeParameters) { - HostName = std::move(hostName); YT_LOG_DEBUG("REQ %v - requesting connection to %v from connection pool", - RequestId, - HostName); + Context_.RequestId, + Context_.HostName); StartTime_ = TInstant::Now(); - Connection = TConnectionPool::Get()->Connect(HostName, socketTimeout); + + try { + Connection_ = TConnectionPool::Get()->Connect(Context_.HostName, SocketTimeout_); + } catch (const std::exception& ex) { + auto wrapped = WrapSystemError(Context_, ex); + std::rethrow_exception(wrapped); + } YT_LOG_DEBUG("REQ %v - connection #%v", - RequestId, - Connection->Id); -} + Context_.RequestId, + Connection_->Id); -IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool includeParameters) -{ - auto strHeader = header.GetHeaderAsString(HostName, RequestId, includeParameters); - Url_ = header.GetUrl(true); + auto strHeader = Header_.GetHeaderAsString(Context_.HostName, Context_.RequestId, includeParameters); - LogRequest(header, Url_, includeParameters, RequestId, HostName); + LogRequest(Header_, Url_, includeParameters, Context_.RequestId, Context_.HostName); - LoggedAttributes_ = GetLoggedAttributes(header, Url_, includeParameters, 128); + LoggedAttributes_ = GetLoggedAttributes(Header_, Url_, includeParameters, 128); - auto outputFormat = header.GetOutputFormat(); + auto outputFormat = Header_.GetOutputFormat(); if (outputFormat && outputFormat->IsTextYson()) { - LogResponse = true; + LogResponse_ = true; } - RequestStream_ = MakeHolder<TRequestStream>(this, *Connection->Socket.Get()); + RequestStream_ = MakeHolder<TRequestStream>(this, *Connection_->Socket.Get()); RequestStream_->Write(strHeader.data(), strHeader.size()); return RequestStream_.Get(); } -IOutputStream* THttpRequest::StartRequest(const THttpHeader& header) +IOutputStream* THttpRequest::StartRequest() { - return StartRequestImpl(header, true); + return StartRequestImpl(true); } void THttpRequest::FinishRequest() @@ -961,16 +1037,16 @@ void THttpRequest::FinishRequest() RequestStream_->Finish(); } -void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body) +void THttpRequest::SmallRequest(TMaybe<TStringBuf> body) { - if (!body && (header.GetMethod() == "PUT" || header.GetMethod() == "POST")) { - const auto& parameters = header.GetParameters(); + if (!body && (Header_.GetMethod() == "PUT" || Header_.GetMethod() == "POST")) { + const auto& parameters = Header_.GetParameters(); auto parametersStr = NodeToYsonString(parameters); - auto* output = StartRequestImpl(header, false); + auto* output = StartRequestImpl(false); output->Write(parametersStr); FinishRequest(); } else { - auto* output = StartRequest(header); + auto* output = StartRequest(); if (body) { output->Write(*body); } @@ -980,17 +1056,17 @@ void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> bo THttpResponse* THttpRequest::GetResponseStream() { - if (!Input) { - SocketInput.Reset(new TSocketInput(*Connection->Socket.Get())); + if (!Input_) { + SocketInput_.Reset(new TSocketInput(*Connection_->Socket.Get())); if (TConfig::Get()->UseAbortableResponse) { Y_ABORT_UNLESS(!Url_.empty()); - Input.Reset(new TAbortableHttpResponse(SocketInput.Get(), RequestId, HostName, Url_)); + Input_.Reset(new TAbortableHttpResponse(Context_, SocketInput_.Get(), Url_)); } else { - Input.Reset(new THttpResponse(SocketInput.Get(), RequestId, HostName)); + Input_.Reset(new THttpResponse(Context_, SocketInput_.Get())); } - Input->CheckErrorResponse(); + Input_->CheckErrorResponse(); } - return Input.Get(); + return Input_.Get(); } TString THttpRequest::GetResponse() @@ -1003,15 +1079,15 @@ TString THttpRequest::GetResponse() << "HostName: " << GetResponseStream()->GetHostName() << "; " << LoggedAttributes_; - if (LogResponse) { + if (LogResponse_) { constexpr auto sizeLimit = 1 << 7; YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)", - RequestId, + Context_.RequestId, TruncateForLogs(result, sizeLimit), loggedAttributes.Str()); } else { YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)", - RequestId, + Context_.RequestId, result.size(), loggedAttributes.Str()); } @@ -1024,8 +1100,8 @@ int THttpRequest::GetHttpCode() { void THttpRequest::InvalidateConnection() { - TConnectionPool::Get()->Invalidate(HostName, Connection); - Connection.Reset(); + TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_); + Connection_.Reset(); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h index 95595959ad..618b1e2c22 100644 --- a/yt/cpp/mapreduce/http/http.h +++ b/yt/cpp/mapreduce/http/http.h @@ -39,6 +39,12 @@ enum class EFrameType KeepAlive = 0x02, }; +struct TRequestContext +{ + TString RequestId; + TString HostName; + TString Method; +}; class THttpHeader { @@ -82,24 +88,23 @@ private: bool ShouldAcceptFraming() const; private: - const TString Method; - const TString Command; - const bool IsApi; - - TNode::TMapType Parameters; - TString ImpersonationUser; - TString Token; - TString ServiceTicket; - TNode Attributes; - TString ProxyAddress; - TString HostPort; - -private: - TMaybe<TFormat> InputFormat = TFormat::YsonText(); - TMaybe<TFormat> OutputFormat = TFormat::YsonText(); - - TString RequestCompression = "identity"; - TString ResponseCompression = "identity"; + const TString Method_; + const TString Command_; + const bool IsApi_; + + TNode::TMapType Parameters_; + TString ImpersonationUser_; + TString Token_; + TString ServiceTicket_; + TNode Attributes_; + TString ProxyAddress_; + TString HostPort_; + + TMaybe<TFormat> InputFormat_ = TFormat::YsonText(); + TMaybe<TFormat> OutputFormat_ = TFormat::YsonText(); + + TString RequestCompression_ = "identity"; + TString ResponseCompression_ = "identity"; }; //////////////////////////////////////////////////////////////////////////////// @@ -172,9 +177,10 @@ public: // 'requestId' and 'hostName' are provided for debug reasons // (they will appear in some error messages). THttpResponse( - IInputStream* socketStream, - const TString& requestId, - const TString& hostName); + TRequestContext context, + IInputStream* socketStream); + + ~THttpResponse(); const THttpHeaders& Headers() const; @@ -196,13 +202,17 @@ private: bool RefreshFrameIfNecessary(); private: - THttpInput HttpInput_; - const TString RequestId_; - const TString HostName_; + class THttpInputWrapped; + +private: + THolder<THttpInputWrapped> HttpInput_; + + const bool Unframe_; + + TRequestContext Context_; int HttpCode_ = 0; TMaybe<TErrorResponse> ErrorResponse_; bool IsExhausted_ = false; - const bool Unframe_; size_t RemainingFrameSize_ = 0; }; @@ -211,18 +221,15 @@ private: class THttpRequest { public: - THttpRequest(); - THttpRequest(const TString& requestId); + THttpRequest(TString requestId, TString hostName, THttpHeader header, TDuration socketTimeout); ~THttpRequest(); TString GetRequestId() const; - void Connect(TString hostName, TDuration socketTimeout = TDuration::Zero()); - - IOutputStream* StartRequest(const THttpHeader& header); + IOutputStream* StartRequest(); void FinishRequest(); - void SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body); + void SmallRequest(TMaybe<TStringBuf> body); THttpResponse* GetResponseStream(); @@ -233,26 +240,28 @@ public: int GetHttpCode(); private: - IOutputStream* StartRequestImpl(const THttpHeader& header, bool includeParameters); + IOutputStream* StartRequestImpl(bool includeParameters); private: class TRequestStream; private: - TString HostName; - TString RequestId; - TString Url_; + const TRequestContext Context_; + const THttpHeader Header_; + const TString Url_; + const TDuration SocketTimeout_; + TInstant StartTime_; TString LoggedAttributes_; - TConnectionPtr Connection; + TConnectionPtr Connection_; THolder<TRequestStream> RequestStream_; - THolder<TSocketInput> SocketInput; - THolder<THttpResponse> Input; + THolder<TSocketInput> SocketInput_; + THolder<THttpResponse> Input_; - bool LogResponse = false; + bool LogResponse_ = false; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/http/http_client.cpp b/yt/cpp/mapreduce/http/http_client.cpp index 305d95b06c..7e9d761c3c 100644 --- a/yt/cpp/mapreduce/http/http_client.cpp +++ b/yt/cpp/mapreduce/http/http_client.cpp @@ -167,23 +167,23 @@ class TDefaultHttpClient public: IHttpResponsePtr Request(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header, TMaybe<TStringBuf> body) override { - auto request = std::make_unique<THttpRequest>(requestId); - auto urlRef = NHttp::ParseUrl(url); + auto host = CreateHost(urlRef.Host, urlRef.PortStr); + + auto request = std::make_unique<THttpRequest>(requestId, host, header, config.SocketTimeout); - request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout); - request->SmallRequest(header, body); + request->SmallRequest(body); return std::make_unique<TDefaultHttpResponse>(std::move(request)); } IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header) override { - auto request = std::make_unique<THttpRequest>(requestId); - auto urlRef = NHttp::ParseUrl(url); + auto host = CreateHost(urlRef.Host, urlRef.PortStr); + + auto request = std::make_unique<THttpRequest>(requestId, host, header, config.SocketTimeout); - request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout); - auto stream = request->StartRequest(header); + auto stream = request->StartRequest(); return std::make_unique<TDefaultHttpRequest>(std::move(request), stream); } }; diff --git a/yt/cpp/mapreduce/http/http_client.h b/yt/cpp/mapreduce/http/http_client.h index 97321c4c9d..6087eca098 100644 --- a/yt/cpp/mapreduce/http/http_client.h +++ b/yt/cpp/mapreduce/http/http_client.h @@ -11,8 +11,6 @@ #include <util/stream/fwd.h> -#include <memory> - namespace NYT::NHttpClient { //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/http/requests.cpp b/yt/cpp/mapreduce/http/requests.cpp index 7d95a10bc2..cd610d6493 100644 --- a/yt/cpp/mapreduce/http/requests.cpp +++ b/yt/cpp/mapreduce/http/requests.cpp @@ -2,7 +2,6 @@ #include "context.h" #include "host_manager.h" -#include "retry_request.h" #include <yt/cpp/mapreduce/client/transaction.h> diff --git a/yt/cpp/mapreduce/http/retry_request.cpp b/yt/cpp/mapreduce/http/retry_request.cpp index 307e310b5b..0c719eeda3 100644 --- a/yt/cpp/mapreduce/http/retry_request.cpp +++ b/yt/cpp/mapreduce/http/retry_request.cpp @@ -114,7 +114,7 @@ TResponseInfo RetryRequestWithPolicy( return Request(context, header, body, requestId, config); } catch (const TErrorResponse& e) { - LogRequestError(requestId, header, e.GetError().GetMessage(), retryPolicy->GetAttemptDescription()); + LogRequestError(requestId, header, e.what(), retryPolicy->GetAttemptDescription()); retryWithSameMutationId = e.IsTransportError(); if (!IsRetriable(e)) { diff --git a/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp index 90196246c5..fa072675fb 100644 --- a/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp +++ b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp @@ -72,11 +72,10 @@ THolder<TSimpleServer> CreateProxyHttpServer() ui64 port; ParseFirstLine(inputStr, method, host, port, command); - THttpRequest request; const TString hostName = ::TStringBuilder() << host << ":" << port; - request.Connect(hostName); auto header = THttpHeader(method, command); - request.StartRequest(header); + THttpRequest request("0-0-0-0", hostName, header, TDuration::Zero()); + request.StartRequest(); request.FinishRequest(); auto res = request.GetResponseStream(); THttpOutput httpOutput(output); @@ -138,9 +137,8 @@ TEST(TConnectionPool, TestReleaseUnread) const TString hostName = ::TStringBuilder() << "localhost:" << simpleServer->GetPort(); for (size_t i = 0; i != 10; ++i) { - THttpRequest request; - request.Connect(hostName); - request.StartRequest(THttpHeader("GET", "foo")); + THttpRequest request("0-0-0-0", hostName, THttpHeader("GET", "foo"), TDuration::Zero()); + request.StartRequest(); request.FinishRequest(); request.GetResponseStream(); } @@ -155,12 +153,12 @@ TEST(TConnectionPool, TestProxy) const TString hostName2 = ::TStringBuilder() << "localhost:" << simpleServer2->GetPort(); for (size_t i = 0; i != 10; ++i) { - THttpRequest request; - request.Connect(hostName2); auto header = THttpHeader("GET", "foo"); header.SetProxyAddress(hostName2); header.SetHostPort(hostName); - request.StartRequest(header); + + THttpRequest request("0-0-0-0", hostName2, header, TDuration::Zero()); + request.StartRequest(); request.FinishRequest(); request.GetResponseStream(); } @@ -176,9 +174,8 @@ TEST(TConnectionPool, TestConcurrency) const auto func = [&] { for (int i = 0; i != 100; ++i) { - THttpRequest request; - request.Connect(hostName); - request.StartRequest(THttpHeader("GET", "foo")); + THttpRequest request("0-0-0-0", hostName, THttpHeader("GET", "foo"), TDuration::Zero()); + request.StartRequest(); request.FinishRequest(); auto res = request.GetResponseStream(); res->ReadAll(); diff --git a/yt/cpp/mapreduce/http/ut/http_ut.cpp b/yt/cpp/mapreduce/http/ut/http_ut.cpp index ca260841d0..e41e83c5a0 100644 --- a/yt/cpp/mapreduce/http/ut/http_ut.cpp +++ b/yt/cpp/mapreduce/http/ut/http_ut.cpp @@ -70,9 +70,8 @@ TEST(TFramingTest, FramingSimple) { auto server = CreateFramingEchoServer(); - THttpRequest request; - request.Connect(server->GetAddress()); - auto requestStream = request.StartRequest(THttpHeader("POST", "concatenate")); + THttpRequest request("0-0-0-0", server->GetAddress(), THttpHeader("POST", "concatenate"), TDuration::Zero()); + auto requestStream = request.StartRequest(); *requestStream << "Some funny data"; request.FinishRequest(); auto response = request.GetResponseStream()->ReadAll(); @@ -83,9 +82,8 @@ TEST(TFramingTest, FramingLarge) { auto server = CreateFramingEchoServer(); - THttpRequest request; - request.Connect(server->GetAddress()); - auto requestStream = request.StartRequest(THttpHeader("POST", "concatenate")); + THttpRequest request("0-0-0-0", server->GetAddress(), THttpHeader("POST", "concatenate"), TDuration::Zero()); + auto requestStream = request.StartRequest(); auto data = TString(100000, 'x'); *requestStream << data; request.FinishRequest(); |