diff options
author | Maxim Yurchuk <maxim-yurchuk@ydb.tech> | 2024-11-20 17:37:57 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-20 17:37:57 +0000 |
commit | f76323e9b295c15751e51e3443aa47a36bee8023 (patch) | |
tree | 4113c8cad473a33e0f746966e0cf087252fa1d7a /yt/cpp/mapreduce/http/http.cpp | |
parent | 753ecb8d410a4cb459c26f3a0082fb2d1724fe63 (diff) | |
parent | a7b9a6afea2a9d7a7bfac4c5eb4c1a8e60adb9e6 (diff) | |
download | ydb-f76323e9b295c15751e51e3443aa47a36bee8023.tar.gz |
Merge pull request #11788 from ydb-platform/mergelibs-241120-1113
Library import 241120-1113
Diffstat (limited to 'yt/cpp/mapreduce/http/http.cpp')
-rw-r--r-- | yt/cpp/mapreduce/http/http.cpp | 340 |
1 files changed, 208 insertions, 132 deletions
diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp index f9eb8539b5..ca243a929a 100644 --- a/yt/cpp/mapreduce/http/http.cpp +++ b/yt/cpp/mapreduce/http/http.cpp @@ -10,6 +10,7 @@ #include <yt/cpp/mapreduce/interface/config.h> #include <yt/cpp/mapreduce/interface/errors.h> +#include <yt/cpp/mapreduce/interface/error_codes.h> #include <yt/cpp/mapreduce/interface/logging/yt_log.h> #include <yt/yt/core/http/http.h> @@ -39,6 +40,27 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// +std::exception_ptr WrapSystemError( + const TRequestContext& context, + const std::exception& ex) +{ + if (auto errorResponse = dynamic_cast<const TErrorResponse*>(&ex); errorResponse != nullptr) { + return std::make_exception_ptr(errorResponse); + } + + auto message = NYT::Format("Request %qv to %qv failed", context.RequestId, context.HostName + context.Method); + TYtError outer(1, message, {TYtError(NClusterErrorCodes::Generic, ex.what())}, { + {"request_id", context.RequestId}, + {"host", context.HostName}, + {"method", context.Method}, + }); + TTransportError errorResponse(std::move(outer)); + + return std::make_exception_ptr(errorResponse); +} + +//////////////////////////////////////////////////////////////////////////////// + class THttpRequest::TRequestStream : public IOutputStream { @@ -92,17 +114,17 @@ private: CheckErrorState(); try { func(); - } catch (const std::exception&) { - HandleWriteException(); + } catch (const std::exception& ex) { + HandleWriteException(ex); } } // In many cases http proxy stops reading request and resets connection // if error has happend. This function tries to read error response // in such cases. - void HandleWriteException() { + void HandleWriteException(const std::exception& ex) { Y_ABORT_UNLESS(WriteError_ == nullptr); - WriteError_ = std::current_exception(); + WriteError_ = WrapSystemError(HttpRequest_->Context_, ex); Y_ABORT_UNLESS(WriteError_ != nullptr); try { HttpRequest_->GetResponseStream(); @@ -130,16 +152,16 @@ private: //////////////////////////////////////////////////////////////////////////////// THttpHeader::THttpHeader(const TString& method, const TString& command, bool isApi) - : Method(method) - , Command(command) - , IsApi(isApi) + : Method_(method) + , Command_(command) + , IsApi_(isApi) { } void THttpHeader::AddParameter(const TString& key, TNode value, bool overwrite) { - auto it = Parameters.find(key); - if (it == Parameters.end()) { - Parameters.emplace(key, std::move(value)); + auto it = Parameters_.find(key); + if (it == Parameters_.end()) { + Parameters_.emplace(key, std::move(value)); } else { if (overwrite) { it->second = std::move(value); @@ -158,12 +180,12 @@ void THttpHeader::MergeParameters(const TNode& newParameters, bool overwrite) void THttpHeader::RemoveParameter(const TString& key) { - Parameters.erase(key); + Parameters_.erase(key); } TNode THttpHeader::GetParameters() const { - return Parameters; + return Parameters_; } void THttpHeader::AddTransactionId(const TTransactionId& transactionId, bool overwrite) @@ -202,81 +224,81 @@ void THttpHeader::AddMutationId() bool THttpHeader::HasMutationId() const { - return Parameters.contains("mutation_id"); + return Parameters_.contains("mutation_id"); } void THttpHeader::SetToken(const TString& token) { - Token = token; + Token_ = token; } void THttpHeader::SetProxyAddress(const TString& proxyAddress) { - ProxyAddress = proxyAddress; + ProxyAddress_ = proxyAddress; } void THttpHeader::SetHostPort(const TString& hostPort) { - HostPort = hostPort; + HostPort_ = hostPort; } void THttpHeader::SetImpersonationUser(const TString& impersonationUser) { - ImpersonationUser = impersonationUser; + ImpersonationUser_ = impersonationUser; } void THttpHeader::SetServiceTicket(const TString& ticket) { - ServiceTicket = ticket; + ServiceTicket_ = ticket; } void THttpHeader::SetInputFormat(const TMaybe<TFormat>& format) { - InputFormat = format; + InputFormat_ = format; } void THttpHeader::SetOutputFormat(const TMaybe<TFormat>& format) { - OutputFormat = format; + OutputFormat_ = format; } TMaybe<TFormat> THttpHeader::GetOutputFormat() const { - return OutputFormat; + return OutputFormat_; } void THttpHeader::SetRequestCompression(const TString& compression) { - RequestCompression = compression; + RequestCompression_ = compression; } void THttpHeader::SetResponseCompression(const TString& compression) { - ResponseCompression = compression; + ResponseCompression_ = compression; } TString THttpHeader::GetCommand() const { - return Command; + return Command_; } TString THttpHeader::GetUrl(bool needProxy) const { TStringStream url; - if (needProxy && !ProxyAddress.empty()) { - url << ProxyAddress << "/"; + if (needProxy && !ProxyAddress_.empty()) { + url << ProxyAddress_ << "/"; return url.Str(); } - if (!ProxyAddress.empty()) { - url << HostPort; + if (!ProxyAddress_.empty()) { + url << HostPort_; } - if (IsApi) { - url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command; + if (IsApi_) { + url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command_; } else { - url << "/" << Command; + url << "/" << Command_; } return url.Str(); @@ -284,16 +306,16 @@ TString THttpHeader::GetUrl(bool needProxy) const bool THttpHeader::ShouldAcceptFraming() const { - return TConfig::Get()->CommandsWithFraming.contains(Command); + return TConfig::Get()->CommandsWithFraming.contains(Command_); } TString THttpHeader::GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters) const { TStringStream result; - result << Method << " " << GetUrl() << " HTTP/1.1\r\n"; + result << Method_ << " " << GetUrl() << " HTTP/1.1\r\n"; - GetHeader(HostPort.empty() ? hostName : HostPort, requestId, includeParameters).Get()->WriteTo(&result); + GetHeader(HostPort_.empty() ? hostName : HostPort_, requestId, includeParameters).Get()->WriteTo(&result); if (ShouldAcceptFraming()) { result << "X-YT-Accept-Framing: 1\r\n"; @@ -311,25 +333,25 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const headers->Add("Host", hostName); headers->Add("User-Agent", TProcessState::Get()->ClientVersion); - if (!Token.empty()) { - headers->Add("Authorization", "OAuth " + Token); + if (!Token_.empty()) { + headers->Add("Authorization", "OAuth " + Token_); } - if (!ServiceTicket.empty()) { - headers->Add("X-Ya-Service-Ticket", ServiceTicket); + if (!ServiceTicket_.empty()) { + headers->Add("X-Ya-Service-Ticket", ServiceTicket_); } - if (!ImpersonationUser.empty()) { - headers->Add("X-Yt-User-Name", ImpersonationUser); + if (!ImpersonationUser_.empty()) { + headers->Add("X-Yt-User-Name", ImpersonationUser_); } - if (Method == "PUT" || Method == "POST") { + if (Method_ == "PUT" || Method_ == "POST") { headers->Add("Transfer-Encoding", "chunked"); } headers->Add("X-YT-Correlation-Id", requestId); headers->Add("X-YT-Header-Format", "<format=text>yson"); - headers->Add("Content-Encoding", RequestCompression); - headers->Add("Accept-Encoding", ResponseCompression); + headers->Add("Content-Encoding", RequestCompression_); + headers->Add("Accept-Encoding", ResponseCompression_); auto printYTHeader = [&headers] (const char* headerName, const TString& value) { static const size_t maxHttpHeaderSize = 64 << 10; @@ -353,14 +375,14 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const } while (ptr != finish); }; - if (InputFormat) { - printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat->Config)); + if (InputFormat_) { + printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat_->Config)); } - if (OutputFormat) { - printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat->Config)); + if (OutputFormat_) { + printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat_->Config)); } if (includeParameters) { - printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters)); + printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters_)); } return NHttp::THeadersPtrWrapper(std::move(headers)); @@ -368,7 +390,7 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const const TString& THttpHeader::GetMethod() const { - return Method; + return Method_; } //////////////////////////////////////////////////////////////////////////////// @@ -667,33 +689,81 @@ SOCKET TConnectionPool::DoConnect(TAddressCache::TAddressPtr address) //////////////////////////////////////////////////////////////////////////////// -static TMaybe<TString> GetProxyName(const THttpInput& input) +class THttpResponse::THttpInputWrapped + : public IInputStream { - if (auto proxyHeader = input.Headers().FindHeader("X-YT-Proxy")) { - return proxyHeader->Value(); +public: + explicit THttpInputWrapped(TRequestContext context, IInputStream* input) + : Context_(std::move(context)) + , HttpInput_(input) + { } + + const THttpHeaders& Headers() const noexcept + { + return HttpInput_.Headers(); + } + + const TString& FirstLine() const noexcept + { + return HttpInput_.FirstLine(); + } + + bool IsKeepAlive() const noexcept + { + return HttpInput_.IsKeepAlive(); + } + + const TMaybe<THttpHeaders>& Trailers() const noexcept + { + return HttpInput_.Trailers(); } - return Nothing(); -} + +private: + size_t DoRead(void* buf, size_t len) override + { + try { + return HttpInput_.Read(buf, len); + } catch (const std::exception& ex) { + auto wrapped = WrapSystemError(Context_, ex); + std::rethrow_exception(wrapped); + } + } + + size_t DoSkip(size_t len) override + { + try { + return HttpInput_.Skip(len); + } catch (const std::exception& ex) { + auto wrapped = WrapSystemError(Context_, ex); + std::rethrow_exception(wrapped); + } + } + +private: + const TRequestContext Context_; + THttpInput HttpInput_; +}; THttpResponse::THttpResponse( - IInputStream* socketStream, - const TString& requestId, - const TString& hostName) - : HttpInput_(socketStream) - , RequestId_(requestId) - , HostName_(GetProxyName(HttpInput_).GetOrElse(hostName)) - , Unframe_(HttpInput_.Headers().HasHeader("X-YT-Framing")) -{ - HttpCode_ = ParseHttpRetCode(HttpInput_.FirstLine()); + TRequestContext context, + IInputStream* socketStream) + : HttpInput_(MakeHolder<THttpInputWrapped>(context, socketStream)) + , Unframe_(HttpInput_->Headers().HasHeader("X-YT-Framing")) + , Context_(std::move(context)) +{ + if (auto proxyHeader = HttpInput_->Headers().FindHeader("X-YT-Proxy")) { + Context_.HostName = proxyHeader->Value(); + } + HttpCode_ = ParseHttpRetCode(HttpInput_->FirstLine()); if (HttpCode_ == 200 || HttpCode_ == 202) { return; } - ErrorResponse_ = TErrorResponse(HttpCode_, RequestId_); + ErrorResponse_ = TErrorResponse(HttpCode_, Context_.RequestId); auto logAndSetError = [&] (const TString& rawError) { YT_LOG_ERROR("RSP %v - HTTP %v - %v", - RequestId_, + Context_.RequestId, HttpCode_, rawError.data()); ErrorResponse_->SetRawError(rawError); @@ -705,26 +775,26 @@ THttpResponse::THttpResponse( break; case 500: - logAndSetError(::TStringBuilder() << "internal error in proxy " << HostName_); + logAndSetError(::TStringBuilder() << "internal error in proxy " << Context_.HostName); break; default: { TStringStream httpHeaders; httpHeaders << "HTTP headers ("; - for (const auto& header : HttpInput_.Headers()) { + for (const auto& header : HttpInput_->Headers()) { httpHeaders << header.Name() << ": " << header.Value() << "; "; } httpHeaders << ")"; auto errorString = Sprintf("RSP %s - HTTP %d - %s", - RequestId_.data(), + Context_.RequestId.data(), HttpCode_, httpHeaders.Str().data()); YT_LOG_ERROR("%v", errorString.data()); - if (auto parsedResponse = ParseError(HttpInput_.Headers())) { + if (auto parsedResponse = ParseError(HttpInput_->Headers())) { ErrorResponse_ = parsedResponse.GetRef(); } else { ErrorResponse_->SetRawError( @@ -735,9 +805,12 @@ THttpResponse::THttpResponse( } } +THttpResponse::~THttpResponse() +{ } + const THttpHeaders& THttpResponse::Headers() const { - return HttpInput_.Headers(); + return HttpInput_->Headers(); } void THttpResponse::CheckErrorResponse() const @@ -759,19 +832,19 @@ int THttpResponse::GetHttpCode() const const TString& THttpResponse::GetHostName() const { - return HostName_; + return Context_.HostName; } bool THttpResponse::IsKeepAlive() const { - return HttpInput_.IsKeepAlive(); + return HttpInput_->IsKeepAlive(); } TMaybe<TErrorResponse> THttpResponse::ParseError(const THttpHeaders& headers) { for (const auto& header : headers) { if (header.Name() == "X-YT-Error") { - TErrorResponse errorResponse(HttpCode_, RequestId_); + TErrorResponse errorResponse(HttpCode_, Context_.RequestId); errorResponse.ParseFromJsonError(header.Value()); if (errorResponse.IsOk()) { return Nothing(); @@ -788,14 +861,14 @@ size_t THttpResponse::DoRead(void* buf, size_t len) if (Unframe_) { read = UnframeRead(buf, len); } else { - read = HttpInput_.Read(buf, len); + read = HttpInput_->Read(buf, len); } if (read == 0 && len != 0) { // THttpInput MUST return defined (but may be empty) // trailers when it is exhausted. - Y_ABORT_UNLESS(HttpInput_.Trailers().Defined(), + Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(), "trailers MUST be defined for exhausted stream"); - CheckTrailers(HttpInput_.Trailers().GetRef()); + CheckTrailers(HttpInput_->Trailers().GetRef()); IsExhausted_ = true; } return read; @@ -807,14 +880,14 @@ size_t THttpResponse::DoSkip(size_t len) if (Unframe_) { skipped = UnframeSkip(len); } else { - skipped = HttpInput_.Skip(len); + skipped = HttpInput_->Skip(len); } if (skipped == 0 && len != 0) { // THttpInput MUST return defined (but may be empty) // trailers when it is exhausted. - Y_ABORT_UNLESS(HttpInput_.Trailers().Defined(), + Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(), "trailers MUST be defined for exhausted stream"); - CheckTrailers(HttpInput_.Trailers().GetRef()); + CheckTrailers(HttpInput_->Trailers().GetRef()); IsExhausted_ = true; } return skipped; @@ -825,13 +898,13 @@ void THttpResponse::CheckTrailers(const THttpHeaders& trailers) if (auto errorResponse = ParseError(trailers)) { errorResponse->SetIsFromTrailers(true); YT_LOG_ERROR("RSP %v - %v", - RequestId_, + Context_.RequestId, errorResponse.GetRef().what()); ythrow errorResponse.GetRef(); } } -static ui32 ReadDataFrameSize(THttpInput* stream) +static ui32 ReadDataFrameSize(IInputStream* stream) { ui32 littleEndianSize; auto read = stream->Load(&littleEndianSize, sizeof(littleEndianSize)); @@ -846,7 +919,7 @@ bool THttpResponse::RefreshFrameIfNecessary() { while (RemainingFrameSize_ == 0) { ui8 frameTypeByte; - auto read = HttpInput_.Read(&frameTypeByte, sizeof(frameTypeByte)); + auto read = HttpInput_->Read(&frameTypeByte, sizeof(frameTypeByte)); if (read == 0) { return false; } @@ -855,7 +928,7 @@ bool THttpResponse::RefreshFrameIfNecessary() case EFrameType::KeepAlive: break; case EFrameType::Data: - RemainingFrameSize_ = ReadDataFrameSize(&HttpInput_); + RemainingFrameSize_ = ReadDataFrameSize(HttpInput_.Get()); break; default: ythrow yexception() << "Bad frame type " << static_cast<int>(frameTypeByte); @@ -869,7 +942,7 @@ size_t THttpResponse::UnframeRead(void* buf, size_t len) if (!RefreshFrameIfNecessary()) { return 0; } - auto read = HttpInput_.Read(buf, Min(len, RemainingFrameSize_)); + auto read = HttpInput_->Read(buf, Min(len, RemainingFrameSize_)); RemainingFrameSize_ -= read; return read; } @@ -879,80 +952,83 @@ size_t THttpResponse::UnframeSkip(size_t len) if (!RefreshFrameIfNecessary()) { return 0; } - auto skipped = HttpInput_.Skip(Min(len, RemainingFrameSize_)); + auto skipped = HttpInput_->Skip(Min(len, RemainingFrameSize_)); RemainingFrameSize_ -= skipped; return skipped; } //////////////////////////////////////////////////////////////////////////////// -THttpRequest::THttpRequest() -{ - RequestId = CreateGuidAsString(); -} - -THttpRequest::THttpRequest(const TString& requestId) - : RequestId(requestId) +THttpRequest::THttpRequest(TString requestId, TString hostName, THttpHeader header, TDuration socketTimeout) + : Context_(TRequestContext{ + .RequestId = std::move(requestId), + .HostName = std::move(hostName), + .Method = header.GetUrl(/*needProxy=*/ false) + }) + , Header_(std::move(header)) + , Url_(Header_.GetUrl(true)) + , SocketTimeout_(socketTimeout) { } THttpRequest::~THttpRequest() { - if (!Connection) { + if (!Connection_) { return; } - if (Input && Input->IsKeepAlive() && Input->IsExhausted()) { + if (Input_ && Input_->IsKeepAlive() && Input_->IsExhausted()) { // We should return to the pool only connections where HTTP response was fully read. // Otherwise next reader might read our remaining data and misinterpret them (YT-6510). - TConnectionPool::Get()->Release(Connection); + TConnectionPool::Get()->Release(Connection_); } else { - TConnectionPool::Get()->Invalidate(HostName, Connection); + TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_); } } TString THttpRequest::GetRequestId() const { - return RequestId; + return Context_.RequestId; } -void THttpRequest::Connect(TString hostName, TDuration socketTimeout) +IOutputStream* THttpRequest::StartRequestImpl(bool includeParameters) { - HostName = std::move(hostName); YT_LOG_DEBUG("REQ %v - requesting connection to %v from connection pool", - RequestId, - HostName); + Context_.RequestId, + Context_.HostName); StartTime_ = TInstant::Now(); - Connection = TConnectionPool::Get()->Connect(HostName, socketTimeout); + + try { + Connection_ = TConnectionPool::Get()->Connect(Context_.HostName, SocketTimeout_); + } catch (const std::exception& ex) { + auto wrapped = WrapSystemError(Context_, ex); + std::rethrow_exception(wrapped); + } YT_LOG_DEBUG("REQ %v - connection #%v", - RequestId, - Connection->Id); -} + Context_.RequestId, + Connection_->Id); -IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool includeParameters) -{ - auto strHeader = header.GetHeaderAsString(HostName, RequestId, includeParameters); - Url_ = header.GetUrl(true); + auto strHeader = Header_.GetHeaderAsString(Context_.HostName, Context_.RequestId, includeParameters); - LogRequest(header, Url_, includeParameters, RequestId, HostName); + LogRequest(Header_, Url_, includeParameters, Context_.RequestId, Context_.HostName); - LoggedAttributes_ = GetLoggedAttributes(header, Url_, includeParameters, 128); + LoggedAttributes_ = GetLoggedAttributes(Header_, Url_, includeParameters, 128); - auto outputFormat = header.GetOutputFormat(); + auto outputFormat = Header_.GetOutputFormat(); if (outputFormat && outputFormat->IsTextYson()) { - LogResponse = true; + LogResponse_ = true; } - RequestStream_ = MakeHolder<TRequestStream>(this, *Connection->Socket.Get()); + RequestStream_ = MakeHolder<TRequestStream>(this, *Connection_->Socket.Get()); RequestStream_->Write(strHeader.data(), strHeader.size()); return RequestStream_.Get(); } -IOutputStream* THttpRequest::StartRequest(const THttpHeader& header) +IOutputStream* THttpRequest::StartRequest() { - return StartRequestImpl(header, true); + return StartRequestImpl(true); } void THttpRequest::FinishRequest() @@ -961,16 +1037,16 @@ void THttpRequest::FinishRequest() RequestStream_->Finish(); } -void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body) +void THttpRequest::SmallRequest(TMaybe<TStringBuf> body) { - if (!body && (header.GetMethod() == "PUT" || header.GetMethod() == "POST")) { - const auto& parameters = header.GetParameters(); + if (!body && (Header_.GetMethod() == "PUT" || Header_.GetMethod() == "POST")) { + const auto& parameters = Header_.GetParameters(); auto parametersStr = NodeToYsonString(parameters); - auto* output = StartRequestImpl(header, false); + auto* output = StartRequestImpl(false); output->Write(parametersStr); FinishRequest(); } else { - auto* output = StartRequest(header); + auto* output = StartRequest(); if (body) { output->Write(*body); } @@ -980,17 +1056,17 @@ void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> bo THttpResponse* THttpRequest::GetResponseStream() { - if (!Input) { - SocketInput.Reset(new TSocketInput(*Connection->Socket.Get())); + if (!Input_) { + SocketInput_.Reset(new TSocketInput(*Connection_->Socket.Get())); if (TConfig::Get()->UseAbortableResponse) { Y_ABORT_UNLESS(!Url_.empty()); - Input.Reset(new TAbortableHttpResponse(SocketInput.Get(), RequestId, HostName, Url_)); + Input_.Reset(new TAbortableHttpResponse(Context_, SocketInput_.Get(), Url_)); } else { - Input.Reset(new THttpResponse(SocketInput.Get(), RequestId, HostName)); + Input_.Reset(new THttpResponse(Context_, SocketInput_.Get())); } - Input->CheckErrorResponse(); + Input_->CheckErrorResponse(); } - return Input.Get(); + return Input_.Get(); } TString THttpRequest::GetResponse() @@ -1003,15 +1079,15 @@ TString THttpRequest::GetResponse() << "HostName: " << GetResponseStream()->GetHostName() << "; " << LoggedAttributes_; - if (LogResponse) { + if (LogResponse_) { constexpr auto sizeLimit = 1 << 7; YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)", - RequestId, + Context_.RequestId, TruncateForLogs(result, sizeLimit), loggedAttributes.Str()); } else { YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)", - RequestId, + Context_.RequestId, result.size(), loggedAttributes.Str()); } @@ -1024,8 +1100,8 @@ int THttpRequest::GetHttpCode() { void THttpRequest::InvalidateConnection() { - TConnectionPool::Get()->Invalidate(HostName, Connection); - Connection.Reset(); + TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_); + Connection_.Reset(); } //////////////////////////////////////////////////////////////////////////////// |