aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/http
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2024-11-20 11:14:58 +0000
committerAlexander Smirnov <alex@ydb.tech>2024-11-20 11:14:58 +0000
commit31773f157bf8164364649b5f470f52dece0a4317 (patch)
tree33d0f7eef45303ab68cf08ab381ce5e5e36c5240 /yt/cpp/mapreduce/http
parent2c7938962d8689e175574fc1e817c05049f27905 (diff)
parenteff600952d5dfe17942f38f510a8ac2b203bb3a5 (diff)
downloadydb-31773f157bf8164364649b5f470f52dece0a4317.tar.gz
Merge branch 'rightlib' into mergelibs-241120-1113
Diffstat (limited to 'yt/cpp/mapreduce/http')
-rw-r--r--yt/cpp/mapreduce/http/abortable_http_response.cpp13
-rw-r--r--yt/cpp/mapreduce/http/abortable_http_response.h3
-rw-r--r--yt/cpp/mapreduce/http/http.cpp340
-rw-r--r--yt/cpp/mapreduce/http/http.h87
-rw-r--r--yt/cpp/mapreduce/http/http_client.cpp16
-rw-r--r--yt/cpp/mapreduce/http/http_client.h2
-rw-r--r--yt/cpp/mapreduce/http/requests.cpp1
-rw-r--r--yt/cpp/mapreduce/http/retry_request.cpp2
-rw-r--r--yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp21
-rw-r--r--yt/cpp/mapreduce/http/ut/http_ut.cpp10
10 files changed, 285 insertions, 210 deletions
diff --git a/yt/cpp/mapreduce/http/abortable_http_response.cpp b/yt/cpp/mapreduce/http/abortable_http_response.cpp
index 9da9241d33..995bb9de4c 100644
--- a/yt/cpp/mapreduce/http/abortable_http_response.cpp
+++ b/yt/cpp/mapreduce/http/abortable_http_response.cpp
@@ -14,20 +14,20 @@ public:
{
auto g = Guard(Lock_);
auto id = NextId_++;
- IdToOutage.emplace(id, TOutageEntry{std::move(urlPattern), options.ResponseCount_, options.LengthLimit_});
+ IdToOutage_.emplace(id, TOutageEntry{std::move(urlPattern), options.ResponseCount_, options.LengthLimit_});
return id;
}
void StopOutage(TOutageId id)
{
auto g = Guard(Lock_);
- IdToOutage.erase(id);
+ IdToOutage_.erase(id);
}
void Add(IAbortableHttpResponse* response)
{
auto g = Guard(Lock_);
- for (auto& [id, entry] : IdToOutage) {
+ for (auto& [id, entry] : IdToOutage_) {
if (entry.Counter > 0 && response->GetUrl().find(entry.Pattern) != TString::npos) {
response->SetLengthLimit(entry.LengthLimit);
entry.Counter -= 1;
@@ -70,7 +70,7 @@ private:
private:
TOutageId NextId_ = 0;
TIntrusiveList<IAbortableHttpResponse> ResponseList_;
- THashMap<TOutageId, TOutageEntry> IdToOutage;
+ THashMap<TOutageId, TOutageEntry> IdToOutage_;
TMutex Lock_;
};
@@ -137,11 +137,10 @@ bool TAbortableHttpResponseBase::IsAborted() const
////////////////////////////////////////////////////////////////////////////////
TAbortableHttpResponse::TAbortableHttpResponse(
+ TRequestContext context,
IInputStream* socketStream,
- const TString& requestId,
- const TString& hostName,
const TString& url)
- : THttpResponse(socketStream, requestId, hostName)
+ : THttpResponse(std::move(context), socketStream)
, TAbortableHttpResponseBase(url)
{
}
diff --git a/yt/cpp/mapreduce/http/abortable_http_response.h b/yt/cpp/mapreduce/http/abortable_http_response.h
index d72bcfa0a6..e9b1483bf7 100644
--- a/yt/cpp/mapreduce/http/abortable_http_response.h
+++ b/yt/cpp/mapreduce/http/abortable_http_response.h
@@ -108,9 +108,8 @@ public:
public:
TAbortableHttpResponse(
+ TRequestContext context,
IInputStream* socketStream,
- const TString& requestId,
- const TString& hostName,
const TString& url);
/// @brief Abort any responses which match `urlPattern` (i.e. contain it in url).
diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp
index f9eb8539b5..ca243a929a 100644
--- a/yt/cpp/mapreduce/http/http.cpp
+++ b/yt/cpp/mapreduce/http/http.cpp
@@ -10,6 +10,7 @@
#include <yt/cpp/mapreduce/interface/config.h>
#include <yt/cpp/mapreduce/interface/errors.h>
+#include <yt/cpp/mapreduce/interface/error_codes.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <yt/yt/core/http/http.h>
@@ -39,6 +40,27 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
+std::exception_ptr WrapSystemError(
+ const TRequestContext& context,
+ const std::exception& ex)
+{
+ if (auto errorResponse = dynamic_cast<const TErrorResponse*>(&ex); errorResponse != nullptr) {
+ return std::make_exception_ptr(errorResponse);
+ }
+
+ auto message = NYT::Format("Request %qv to %qv failed", context.RequestId, context.HostName + context.Method);
+ TYtError outer(1, message, {TYtError(NClusterErrorCodes::Generic, ex.what())}, {
+ {"request_id", context.RequestId},
+ {"host", context.HostName},
+ {"method", context.Method},
+ });
+ TTransportError errorResponse(std::move(outer));
+
+ return std::make_exception_ptr(errorResponse);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
class THttpRequest::TRequestStream
: public IOutputStream
{
@@ -92,17 +114,17 @@ private:
CheckErrorState();
try {
func();
- } catch (const std::exception&) {
- HandleWriteException();
+ } catch (const std::exception& ex) {
+ HandleWriteException(ex);
}
}
// In many cases http proxy stops reading request and resets connection
// if error has happend. This function tries to read error response
// in such cases.
- void HandleWriteException() {
+ void HandleWriteException(const std::exception& ex) {
Y_ABORT_UNLESS(WriteError_ == nullptr);
- WriteError_ = std::current_exception();
+ WriteError_ = WrapSystemError(HttpRequest_->Context_, ex);
Y_ABORT_UNLESS(WriteError_ != nullptr);
try {
HttpRequest_->GetResponseStream();
@@ -130,16 +152,16 @@ private:
////////////////////////////////////////////////////////////////////////////////
THttpHeader::THttpHeader(const TString& method, const TString& command, bool isApi)
- : Method(method)
- , Command(command)
- , IsApi(isApi)
+ : Method_(method)
+ , Command_(command)
+ , IsApi_(isApi)
{ }
void THttpHeader::AddParameter(const TString& key, TNode value, bool overwrite)
{
- auto it = Parameters.find(key);
- if (it == Parameters.end()) {
- Parameters.emplace(key, std::move(value));
+ auto it = Parameters_.find(key);
+ if (it == Parameters_.end()) {
+ Parameters_.emplace(key, std::move(value));
} else {
if (overwrite) {
it->second = std::move(value);
@@ -158,12 +180,12 @@ void THttpHeader::MergeParameters(const TNode& newParameters, bool overwrite)
void THttpHeader::RemoveParameter(const TString& key)
{
- Parameters.erase(key);
+ Parameters_.erase(key);
}
TNode THttpHeader::GetParameters() const
{
- return Parameters;
+ return Parameters_;
}
void THttpHeader::AddTransactionId(const TTransactionId& transactionId, bool overwrite)
@@ -202,81 +224,81 @@ void THttpHeader::AddMutationId()
bool THttpHeader::HasMutationId() const
{
- return Parameters.contains("mutation_id");
+ return Parameters_.contains("mutation_id");
}
void THttpHeader::SetToken(const TString& token)
{
- Token = token;
+ Token_ = token;
}
void THttpHeader::SetProxyAddress(const TString& proxyAddress)
{
- ProxyAddress = proxyAddress;
+ ProxyAddress_ = proxyAddress;
}
void THttpHeader::SetHostPort(const TString& hostPort)
{
- HostPort = hostPort;
+ HostPort_ = hostPort;
}
void THttpHeader::SetImpersonationUser(const TString& impersonationUser)
{
- ImpersonationUser = impersonationUser;
+ ImpersonationUser_ = impersonationUser;
}
void THttpHeader::SetServiceTicket(const TString& ticket)
{
- ServiceTicket = ticket;
+ ServiceTicket_ = ticket;
}
void THttpHeader::SetInputFormat(const TMaybe<TFormat>& format)
{
- InputFormat = format;
+ InputFormat_ = format;
}
void THttpHeader::SetOutputFormat(const TMaybe<TFormat>& format)
{
- OutputFormat = format;
+ OutputFormat_ = format;
}
TMaybe<TFormat> THttpHeader::GetOutputFormat() const
{
- return OutputFormat;
+ return OutputFormat_;
}
void THttpHeader::SetRequestCompression(const TString& compression)
{
- RequestCompression = compression;
+ RequestCompression_ = compression;
}
void THttpHeader::SetResponseCompression(const TString& compression)
{
- ResponseCompression = compression;
+ ResponseCompression_ = compression;
}
TString THttpHeader::GetCommand() const
{
- return Command;
+ return Command_;
}
TString THttpHeader::GetUrl(bool needProxy) const
{
TStringStream url;
- if (needProxy && !ProxyAddress.empty()) {
- url << ProxyAddress << "/";
+ if (needProxy && !ProxyAddress_.empty()) {
+ url << ProxyAddress_ << "/";
return url.Str();
}
- if (!ProxyAddress.empty()) {
- url << HostPort;
+ if (!ProxyAddress_.empty()) {
+ url << HostPort_;
}
- if (IsApi) {
- url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command;
+ if (IsApi_) {
+ url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command_;
} else {
- url << "/" << Command;
+ url << "/" << Command_;
}
return url.Str();
@@ -284,16 +306,16 @@ TString THttpHeader::GetUrl(bool needProxy) const
bool THttpHeader::ShouldAcceptFraming() const
{
- return TConfig::Get()->CommandsWithFraming.contains(Command);
+ return TConfig::Get()->CommandsWithFraming.contains(Command_);
}
TString THttpHeader::GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters) const
{
TStringStream result;
- result << Method << " " << GetUrl() << " HTTP/1.1\r\n";
+ result << Method_ << " " << GetUrl() << " HTTP/1.1\r\n";
- GetHeader(HostPort.empty() ? hostName : HostPort, requestId, includeParameters).Get()->WriteTo(&result);
+ GetHeader(HostPort_.empty() ? hostName : HostPort_, requestId, includeParameters).Get()->WriteTo(&result);
if (ShouldAcceptFraming()) {
result << "X-YT-Accept-Framing: 1\r\n";
@@ -311,25 +333,25 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const
headers->Add("Host", hostName);
headers->Add("User-Agent", TProcessState::Get()->ClientVersion);
- if (!Token.empty()) {
- headers->Add("Authorization", "OAuth " + Token);
+ if (!Token_.empty()) {
+ headers->Add("Authorization", "OAuth " + Token_);
}
- if (!ServiceTicket.empty()) {
- headers->Add("X-Ya-Service-Ticket", ServiceTicket);
+ if (!ServiceTicket_.empty()) {
+ headers->Add("X-Ya-Service-Ticket", ServiceTicket_);
}
- if (!ImpersonationUser.empty()) {
- headers->Add("X-Yt-User-Name", ImpersonationUser);
+ if (!ImpersonationUser_.empty()) {
+ headers->Add("X-Yt-User-Name", ImpersonationUser_);
}
- if (Method == "PUT" || Method == "POST") {
+ if (Method_ == "PUT" || Method_ == "POST") {
headers->Add("Transfer-Encoding", "chunked");
}
headers->Add("X-YT-Correlation-Id", requestId);
headers->Add("X-YT-Header-Format", "<format=text>yson");
- headers->Add("Content-Encoding", RequestCompression);
- headers->Add("Accept-Encoding", ResponseCompression);
+ headers->Add("Content-Encoding", RequestCompression_);
+ headers->Add("Accept-Encoding", ResponseCompression_);
auto printYTHeader = [&headers] (const char* headerName, const TString& value) {
static const size_t maxHttpHeaderSize = 64 << 10;
@@ -353,14 +375,14 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const
} while (ptr != finish);
};
- if (InputFormat) {
- printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat->Config));
+ if (InputFormat_) {
+ printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat_->Config));
}
- if (OutputFormat) {
- printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat->Config));
+ if (OutputFormat_) {
+ printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat_->Config));
}
if (includeParameters) {
- printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters));
+ printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters_));
}
return NHttp::THeadersPtrWrapper(std::move(headers));
@@ -368,7 +390,7 @@ NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const
const TString& THttpHeader::GetMethod() const
{
- return Method;
+ return Method_;
}
////////////////////////////////////////////////////////////////////////////////
@@ -667,33 +689,81 @@ SOCKET TConnectionPool::DoConnect(TAddressCache::TAddressPtr address)
////////////////////////////////////////////////////////////////////////////////
-static TMaybe<TString> GetProxyName(const THttpInput& input)
+class THttpResponse::THttpInputWrapped
+ : public IInputStream
{
- if (auto proxyHeader = input.Headers().FindHeader("X-YT-Proxy")) {
- return proxyHeader->Value();
+public:
+ explicit THttpInputWrapped(TRequestContext context, IInputStream* input)
+ : Context_(std::move(context))
+ , HttpInput_(input)
+ { }
+
+ const THttpHeaders& Headers() const noexcept
+ {
+ return HttpInput_.Headers();
+ }
+
+ const TString& FirstLine() const noexcept
+ {
+ return HttpInput_.FirstLine();
+ }
+
+ bool IsKeepAlive() const noexcept
+ {
+ return HttpInput_.IsKeepAlive();
+ }
+
+ const TMaybe<THttpHeaders>& Trailers() const noexcept
+ {
+ return HttpInput_.Trailers();
}
- return Nothing();
-}
+
+private:
+ size_t DoRead(void* buf, size_t len) override
+ {
+ try {
+ return HttpInput_.Read(buf, len);
+ } catch (const std::exception& ex) {
+ auto wrapped = WrapSystemError(Context_, ex);
+ std::rethrow_exception(wrapped);
+ }
+ }
+
+ size_t DoSkip(size_t len) override
+ {
+ try {
+ return HttpInput_.Skip(len);
+ } catch (const std::exception& ex) {
+ auto wrapped = WrapSystemError(Context_, ex);
+ std::rethrow_exception(wrapped);
+ }
+ }
+
+private:
+ const TRequestContext Context_;
+ THttpInput HttpInput_;
+};
THttpResponse::THttpResponse(
- IInputStream* socketStream,
- const TString& requestId,
- const TString& hostName)
- : HttpInput_(socketStream)
- , RequestId_(requestId)
- , HostName_(GetProxyName(HttpInput_).GetOrElse(hostName))
- , Unframe_(HttpInput_.Headers().HasHeader("X-YT-Framing"))
-{
- HttpCode_ = ParseHttpRetCode(HttpInput_.FirstLine());
+ TRequestContext context,
+ IInputStream* socketStream)
+ : HttpInput_(MakeHolder<THttpInputWrapped>(context, socketStream))
+ , Unframe_(HttpInput_->Headers().HasHeader("X-YT-Framing"))
+ , Context_(std::move(context))
+{
+ if (auto proxyHeader = HttpInput_->Headers().FindHeader("X-YT-Proxy")) {
+ Context_.HostName = proxyHeader->Value();
+ }
+ HttpCode_ = ParseHttpRetCode(HttpInput_->FirstLine());
if (HttpCode_ == 200 || HttpCode_ == 202) {
return;
}
- ErrorResponse_ = TErrorResponse(HttpCode_, RequestId_);
+ ErrorResponse_ = TErrorResponse(HttpCode_, Context_.RequestId);
auto logAndSetError = [&] (const TString& rawError) {
YT_LOG_ERROR("RSP %v - HTTP %v - %v",
- RequestId_,
+ Context_.RequestId,
HttpCode_,
rawError.data());
ErrorResponse_->SetRawError(rawError);
@@ -705,26 +775,26 @@ THttpResponse::THttpResponse(
break;
case 500:
- logAndSetError(::TStringBuilder() << "internal error in proxy " << HostName_);
+ logAndSetError(::TStringBuilder() << "internal error in proxy " << Context_.HostName);
break;
default: {
TStringStream httpHeaders;
httpHeaders << "HTTP headers (";
- for (const auto& header : HttpInput_.Headers()) {
+ for (const auto& header : HttpInput_->Headers()) {
httpHeaders << header.Name() << ": " << header.Value() << "; ";
}
httpHeaders << ")";
auto errorString = Sprintf("RSP %s - HTTP %d - %s",
- RequestId_.data(),
+ Context_.RequestId.data(),
HttpCode_,
httpHeaders.Str().data());
YT_LOG_ERROR("%v",
errorString.data());
- if (auto parsedResponse = ParseError(HttpInput_.Headers())) {
+ if (auto parsedResponse = ParseError(HttpInput_->Headers())) {
ErrorResponse_ = parsedResponse.GetRef();
} else {
ErrorResponse_->SetRawError(
@@ -735,9 +805,12 @@ THttpResponse::THttpResponse(
}
}
+THttpResponse::~THttpResponse()
+{ }
+
const THttpHeaders& THttpResponse::Headers() const
{
- return HttpInput_.Headers();
+ return HttpInput_->Headers();
}
void THttpResponse::CheckErrorResponse() const
@@ -759,19 +832,19 @@ int THttpResponse::GetHttpCode() const
const TString& THttpResponse::GetHostName() const
{
- return HostName_;
+ return Context_.HostName;
}
bool THttpResponse::IsKeepAlive() const
{
- return HttpInput_.IsKeepAlive();
+ return HttpInput_->IsKeepAlive();
}
TMaybe<TErrorResponse> THttpResponse::ParseError(const THttpHeaders& headers)
{
for (const auto& header : headers) {
if (header.Name() == "X-YT-Error") {
- TErrorResponse errorResponse(HttpCode_, RequestId_);
+ TErrorResponse errorResponse(HttpCode_, Context_.RequestId);
errorResponse.ParseFromJsonError(header.Value());
if (errorResponse.IsOk()) {
return Nothing();
@@ -788,14 +861,14 @@ size_t THttpResponse::DoRead(void* buf, size_t len)
if (Unframe_) {
read = UnframeRead(buf, len);
} else {
- read = HttpInput_.Read(buf, len);
+ read = HttpInput_->Read(buf, len);
}
if (read == 0 && len != 0) {
// THttpInput MUST return defined (but may be empty)
// trailers when it is exhausted.
- Y_ABORT_UNLESS(HttpInput_.Trailers().Defined(),
+ Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(),
"trailers MUST be defined for exhausted stream");
- CheckTrailers(HttpInput_.Trailers().GetRef());
+ CheckTrailers(HttpInput_->Trailers().GetRef());
IsExhausted_ = true;
}
return read;
@@ -807,14 +880,14 @@ size_t THttpResponse::DoSkip(size_t len)
if (Unframe_) {
skipped = UnframeSkip(len);
} else {
- skipped = HttpInput_.Skip(len);
+ skipped = HttpInput_->Skip(len);
}
if (skipped == 0 && len != 0) {
// THttpInput MUST return defined (but may be empty)
// trailers when it is exhausted.
- Y_ABORT_UNLESS(HttpInput_.Trailers().Defined(),
+ Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(),
"trailers MUST be defined for exhausted stream");
- CheckTrailers(HttpInput_.Trailers().GetRef());
+ CheckTrailers(HttpInput_->Trailers().GetRef());
IsExhausted_ = true;
}
return skipped;
@@ -825,13 +898,13 @@ void THttpResponse::CheckTrailers(const THttpHeaders& trailers)
if (auto errorResponse = ParseError(trailers)) {
errorResponse->SetIsFromTrailers(true);
YT_LOG_ERROR("RSP %v - %v",
- RequestId_,
+ Context_.RequestId,
errorResponse.GetRef().what());
ythrow errorResponse.GetRef();
}
}
-static ui32 ReadDataFrameSize(THttpInput* stream)
+static ui32 ReadDataFrameSize(IInputStream* stream)
{
ui32 littleEndianSize;
auto read = stream->Load(&littleEndianSize, sizeof(littleEndianSize));
@@ -846,7 +919,7 @@ bool THttpResponse::RefreshFrameIfNecessary()
{
while (RemainingFrameSize_ == 0) {
ui8 frameTypeByte;
- auto read = HttpInput_.Read(&frameTypeByte, sizeof(frameTypeByte));
+ auto read = HttpInput_->Read(&frameTypeByte, sizeof(frameTypeByte));
if (read == 0) {
return false;
}
@@ -855,7 +928,7 @@ bool THttpResponse::RefreshFrameIfNecessary()
case EFrameType::KeepAlive:
break;
case EFrameType::Data:
- RemainingFrameSize_ = ReadDataFrameSize(&HttpInput_);
+ RemainingFrameSize_ = ReadDataFrameSize(HttpInput_.Get());
break;
default:
ythrow yexception() << "Bad frame type " << static_cast<int>(frameTypeByte);
@@ -869,7 +942,7 @@ size_t THttpResponse::UnframeRead(void* buf, size_t len)
if (!RefreshFrameIfNecessary()) {
return 0;
}
- auto read = HttpInput_.Read(buf, Min(len, RemainingFrameSize_));
+ auto read = HttpInput_->Read(buf, Min(len, RemainingFrameSize_));
RemainingFrameSize_ -= read;
return read;
}
@@ -879,80 +952,83 @@ size_t THttpResponse::UnframeSkip(size_t len)
if (!RefreshFrameIfNecessary()) {
return 0;
}
- auto skipped = HttpInput_.Skip(Min(len, RemainingFrameSize_));
+ auto skipped = HttpInput_->Skip(Min(len, RemainingFrameSize_));
RemainingFrameSize_ -= skipped;
return skipped;
}
////////////////////////////////////////////////////////////////////////////////
-THttpRequest::THttpRequest()
-{
- RequestId = CreateGuidAsString();
-}
-
-THttpRequest::THttpRequest(const TString& requestId)
- : RequestId(requestId)
+THttpRequest::THttpRequest(TString requestId, TString hostName, THttpHeader header, TDuration socketTimeout)
+ : Context_(TRequestContext{
+ .RequestId = std::move(requestId),
+ .HostName = std::move(hostName),
+ .Method = header.GetUrl(/*needProxy=*/ false)
+ })
+ , Header_(std::move(header))
+ , Url_(Header_.GetUrl(true))
+ , SocketTimeout_(socketTimeout)
{ }
THttpRequest::~THttpRequest()
{
- if (!Connection) {
+ if (!Connection_) {
return;
}
- if (Input && Input->IsKeepAlive() && Input->IsExhausted()) {
+ if (Input_ && Input_->IsKeepAlive() && Input_->IsExhausted()) {
// We should return to the pool only connections where HTTP response was fully read.
// Otherwise next reader might read our remaining data and misinterpret them (YT-6510).
- TConnectionPool::Get()->Release(Connection);
+ TConnectionPool::Get()->Release(Connection_);
} else {
- TConnectionPool::Get()->Invalidate(HostName, Connection);
+ TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_);
}
}
TString THttpRequest::GetRequestId() const
{
- return RequestId;
+ return Context_.RequestId;
}
-void THttpRequest::Connect(TString hostName, TDuration socketTimeout)
+IOutputStream* THttpRequest::StartRequestImpl(bool includeParameters)
{
- HostName = std::move(hostName);
YT_LOG_DEBUG("REQ %v - requesting connection to %v from connection pool",
- RequestId,
- HostName);
+ Context_.RequestId,
+ Context_.HostName);
StartTime_ = TInstant::Now();
- Connection = TConnectionPool::Get()->Connect(HostName, socketTimeout);
+
+ try {
+ Connection_ = TConnectionPool::Get()->Connect(Context_.HostName, SocketTimeout_);
+ } catch (const std::exception& ex) {
+ auto wrapped = WrapSystemError(Context_, ex);
+ std::rethrow_exception(wrapped);
+ }
YT_LOG_DEBUG("REQ %v - connection #%v",
- RequestId,
- Connection->Id);
-}
+ Context_.RequestId,
+ Connection_->Id);
-IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool includeParameters)
-{
- auto strHeader = header.GetHeaderAsString(HostName, RequestId, includeParameters);
- Url_ = header.GetUrl(true);
+ auto strHeader = Header_.GetHeaderAsString(Context_.HostName, Context_.RequestId, includeParameters);
- LogRequest(header, Url_, includeParameters, RequestId, HostName);
+ LogRequest(Header_, Url_, includeParameters, Context_.RequestId, Context_.HostName);
- LoggedAttributes_ = GetLoggedAttributes(header, Url_, includeParameters, 128);
+ LoggedAttributes_ = GetLoggedAttributes(Header_, Url_, includeParameters, 128);
- auto outputFormat = header.GetOutputFormat();
+ auto outputFormat = Header_.GetOutputFormat();
if (outputFormat && outputFormat->IsTextYson()) {
- LogResponse = true;
+ LogResponse_ = true;
}
- RequestStream_ = MakeHolder<TRequestStream>(this, *Connection->Socket.Get());
+ RequestStream_ = MakeHolder<TRequestStream>(this, *Connection_->Socket.Get());
RequestStream_->Write(strHeader.data(), strHeader.size());
return RequestStream_.Get();
}
-IOutputStream* THttpRequest::StartRequest(const THttpHeader& header)
+IOutputStream* THttpRequest::StartRequest()
{
- return StartRequestImpl(header, true);
+ return StartRequestImpl(true);
}
void THttpRequest::FinishRequest()
@@ -961,16 +1037,16 @@ void THttpRequest::FinishRequest()
RequestStream_->Finish();
}
-void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body)
+void THttpRequest::SmallRequest(TMaybe<TStringBuf> body)
{
- if (!body && (header.GetMethod() == "PUT" || header.GetMethod() == "POST")) {
- const auto& parameters = header.GetParameters();
+ if (!body && (Header_.GetMethod() == "PUT" || Header_.GetMethod() == "POST")) {
+ const auto& parameters = Header_.GetParameters();
auto parametersStr = NodeToYsonString(parameters);
- auto* output = StartRequestImpl(header, false);
+ auto* output = StartRequestImpl(false);
output->Write(parametersStr);
FinishRequest();
} else {
- auto* output = StartRequest(header);
+ auto* output = StartRequest();
if (body) {
output->Write(*body);
}
@@ -980,17 +1056,17 @@ void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> bo
THttpResponse* THttpRequest::GetResponseStream()
{
- if (!Input) {
- SocketInput.Reset(new TSocketInput(*Connection->Socket.Get()));
+ if (!Input_) {
+ SocketInput_.Reset(new TSocketInput(*Connection_->Socket.Get()));
if (TConfig::Get()->UseAbortableResponse) {
Y_ABORT_UNLESS(!Url_.empty());
- Input.Reset(new TAbortableHttpResponse(SocketInput.Get(), RequestId, HostName, Url_));
+ Input_.Reset(new TAbortableHttpResponse(Context_, SocketInput_.Get(), Url_));
} else {
- Input.Reset(new THttpResponse(SocketInput.Get(), RequestId, HostName));
+ Input_.Reset(new THttpResponse(Context_, SocketInput_.Get()));
}
- Input->CheckErrorResponse();
+ Input_->CheckErrorResponse();
}
- return Input.Get();
+ return Input_.Get();
}
TString THttpRequest::GetResponse()
@@ -1003,15 +1079,15 @@ TString THttpRequest::GetResponse()
<< "HostName: " << GetResponseStream()->GetHostName() << "; "
<< LoggedAttributes_;
- if (LogResponse) {
+ if (LogResponse_) {
constexpr auto sizeLimit = 1 << 7;
YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)",
- RequestId,
+ Context_.RequestId,
TruncateForLogs(result, sizeLimit),
loggedAttributes.Str());
} else {
YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)",
- RequestId,
+ Context_.RequestId,
result.size(),
loggedAttributes.Str());
}
@@ -1024,8 +1100,8 @@ int THttpRequest::GetHttpCode() {
void THttpRequest::InvalidateConnection()
{
- TConnectionPool::Get()->Invalidate(HostName, Connection);
- Connection.Reset();
+ TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_);
+ Connection_.Reset();
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h
index 95595959ad..618b1e2c22 100644
--- a/yt/cpp/mapreduce/http/http.h
+++ b/yt/cpp/mapreduce/http/http.h
@@ -39,6 +39,12 @@ enum class EFrameType
KeepAlive = 0x02,
};
+struct TRequestContext
+{
+ TString RequestId;
+ TString HostName;
+ TString Method;
+};
class THttpHeader
{
@@ -82,24 +88,23 @@ private:
bool ShouldAcceptFraming() const;
private:
- const TString Method;
- const TString Command;
- const bool IsApi;
-
- TNode::TMapType Parameters;
- TString ImpersonationUser;
- TString Token;
- TString ServiceTicket;
- TNode Attributes;
- TString ProxyAddress;
- TString HostPort;
-
-private:
- TMaybe<TFormat> InputFormat = TFormat::YsonText();
- TMaybe<TFormat> OutputFormat = TFormat::YsonText();
-
- TString RequestCompression = "identity";
- TString ResponseCompression = "identity";
+ const TString Method_;
+ const TString Command_;
+ const bool IsApi_;
+
+ TNode::TMapType Parameters_;
+ TString ImpersonationUser_;
+ TString Token_;
+ TString ServiceTicket_;
+ TNode Attributes_;
+ TString ProxyAddress_;
+ TString HostPort_;
+
+ TMaybe<TFormat> InputFormat_ = TFormat::YsonText();
+ TMaybe<TFormat> OutputFormat_ = TFormat::YsonText();
+
+ TString RequestCompression_ = "identity";
+ TString ResponseCompression_ = "identity";
};
////////////////////////////////////////////////////////////////////////////////
@@ -172,9 +177,10 @@ public:
// 'requestId' and 'hostName' are provided for debug reasons
// (they will appear in some error messages).
THttpResponse(
- IInputStream* socketStream,
- const TString& requestId,
- const TString& hostName);
+ TRequestContext context,
+ IInputStream* socketStream);
+
+ ~THttpResponse();
const THttpHeaders& Headers() const;
@@ -196,13 +202,17 @@ private:
bool RefreshFrameIfNecessary();
private:
- THttpInput HttpInput_;
- const TString RequestId_;
- const TString HostName_;
+ class THttpInputWrapped;
+
+private:
+ THolder<THttpInputWrapped> HttpInput_;
+
+ const bool Unframe_;
+
+ TRequestContext Context_;
int HttpCode_ = 0;
TMaybe<TErrorResponse> ErrorResponse_;
bool IsExhausted_ = false;
- const bool Unframe_;
size_t RemainingFrameSize_ = 0;
};
@@ -211,18 +221,15 @@ private:
class THttpRequest
{
public:
- THttpRequest();
- THttpRequest(const TString& requestId);
+ THttpRequest(TString requestId, TString hostName, THttpHeader header, TDuration socketTimeout);
~THttpRequest();
TString GetRequestId() const;
- void Connect(TString hostName, TDuration socketTimeout = TDuration::Zero());
-
- IOutputStream* StartRequest(const THttpHeader& header);
+ IOutputStream* StartRequest();
void FinishRequest();
- void SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body);
+ void SmallRequest(TMaybe<TStringBuf> body);
THttpResponse* GetResponseStream();
@@ -233,26 +240,28 @@ public:
int GetHttpCode();
private:
- IOutputStream* StartRequestImpl(const THttpHeader& header, bool includeParameters);
+ IOutputStream* StartRequestImpl(bool includeParameters);
private:
class TRequestStream;
private:
- TString HostName;
- TString RequestId;
- TString Url_;
+ const TRequestContext Context_;
+ const THttpHeader Header_;
+ const TString Url_;
+ const TDuration SocketTimeout_;
+
TInstant StartTime_;
TString LoggedAttributes_;
- TConnectionPtr Connection;
+ TConnectionPtr Connection_;
THolder<TRequestStream> RequestStream_;
- THolder<TSocketInput> SocketInput;
- THolder<THttpResponse> Input;
+ THolder<TSocketInput> SocketInput_;
+ THolder<THttpResponse> Input_;
- bool LogResponse = false;
+ bool LogResponse_ = false;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/http/http_client.cpp b/yt/cpp/mapreduce/http/http_client.cpp
index 305d95b06c..7e9d761c3c 100644
--- a/yt/cpp/mapreduce/http/http_client.cpp
+++ b/yt/cpp/mapreduce/http/http_client.cpp
@@ -167,23 +167,23 @@ class TDefaultHttpClient
public:
IHttpResponsePtr Request(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header, TMaybe<TStringBuf> body) override
{
- auto request = std::make_unique<THttpRequest>(requestId);
-
auto urlRef = NHttp::ParseUrl(url);
+ auto host = CreateHost(urlRef.Host, urlRef.PortStr);
+
+ auto request = std::make_unique<THttpRequest>(requestId, host, header, config.SocketTimeout);
- request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout);
- request->SmallRequest(header, body);
+ request->SmallRequest(body);
return std::make_unique<TDefaultHttpResponse>(std::move(request));
}
IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header) override
{
- auto request = std::make_unique<THttpRequest>(requestId);
-
auto urlRef = NHttp::ParseUrl(url);
+ auto host = CreateHost(urlRef.Host, urlRef.PortStr);
+
+ auto request = std::make_unique<THttpRequest>(requestId, host, header, config.SocketTimeout);
- request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout);
- auto stream = request->StartRequest(header);
+ auto stream = request->StartRequest();
return std::make_unique<TDefaultHttpRequest>(std::move(request), stream);
}
};
diff --git a/yt/cpp/mapreduce/http/http_client.h b/yt/cpp/mapreduce/http/http_client.h
index 97321c4c9d..6087eca098 100644
--- a/yt/cpp/mapreduce/http/http_client.h
+++ b/yt/cpp/mapreduce/http/http_client.h
@@ -11,8 +11,6 @@
#include <util/stream/fwd.h>
-#include <memory>
-
namespace NYT::NHttpClient {
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/http/requests.cpp b/yt/cpp/mapreduce/http/requests.cpp
index 7d95a10bc2..cd610d6493 100644
--- a/yt/cpp/mapreduce/http/requests.cpp
+++ b/yt/cpp/mapreduce/http/requests.cpp
@@ -2,7 +2,6 @@
#include "context.h"
#include "host_manager.h"
-#include "retry_request.h"
#include <yt/cpp/mapreduce/client/transaction.h>
diff --git a/yt/cpp/mapreduce/http/retry_request.cpp b/yt/cpp/mapreduce/http/retry_request.cpp
index 307e310b5b..0c719eeda3 100644
--- a/yt/cpp/mapreduce/http/retry_request.cpp
+++ b/yt/cpp/mapreduce/http/retry_request.cpp
@@ -114,7 +114,7 @@ TResponseInfo RetryRequestWithPolicy(
return Request(context, header, body, requestId, config);
} catch (const TErrorResponse& e) {
- LogRequestError(requestId, header, e.GetError().GetMessage(), retryPolicy->GetAttemptDescription());
+ LogRequestError(requestId, header, e.what(), retryPolicy->GetAttemptDescription());
retryWithSameMutationId = e.IsTransportError();
if (!IsRetriable(e)) {
diff --git a/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp
index 90196246c5..fa072675fb 100644
--- a/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp
+++ b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp
@@ -72,11 +72,10 @@ THolder<TSimpleServer> CreateProxyHttpServer()
ui64 port;
ParseFirstLine(inputStr, method, host, port, command);
- THttpRequest request;
const TString hostName = ::TStringBuilder() << host << ":" << port;
- request.Connect(hostName);
auto header = THttpHeader(method, command);
- request.StartRequest(header);
+ THttpRequest request("0-0-0-0", hostName, header, TDuration::Zero());
+ request.StartRequest();
request.FinishRequest();
auto res = request.GetResponseStream();
THttpOutput httpOutput(output);
@@ -138,9 +137,8 @@ TEST(TConnectionPool, TestReleaseUnread)
const TString hostName = ::TStringBuilder() << "localhost:" << simpleServer->GetPort();
for (size_t i = 0; i != 10; ++i) {
- THttpRequest request;
- request.Connect(hostName);
- request.StartRequest(THttpHeader("GET", "foo"));
+ THttpRequest request("0-0-0-0", hostName, THttpHeader("GET", "foo"), TDuration::Zero());
+ request.StartRequest();
request.FinishRequest();
request.GetResponseStream();
}
@@ -155,12 +153,12 @@ TEST(TConnectionPool, TestProxy)
const TString hostName2 = ::TStringBuilder() << "localhost:" << simpleServer2->GetPort();
for (size_t i = 0; i != 10; ++i) {
- THttpRequest request;
- request.Connect(hostName2);
auto header = THttpHeader("GET", "foo");
header.SetProxyAddress(hostName2);
header.SetHostPort(hostName);
- request.StartRequest(header);
+
+ THttpRequest request("0-0-0-0", hostName2, header, TDuration::Zero());
+ request.StartRequest();
request.FinishRequest();
request.GetResponseStream();
}
@@ -176,9 +174,8 @@ TEST(TConnectionPool, TestConcurrency)
const auto func = [&] {
for (int i = 0; i != 100; ++i) {
- THttpRequest request;
- request.Connect(hostName);
- request.StartRequest(THttpHeader("GET", "foo"));
+ THttpRequest request("0-0-0-0", hostName, THttpHeader("GET", "foo"), TDuration::Zero());
+ request.StartRequest();
request.FinishRequest();
auto res = request.GetResponseStream();
res->ReadAll();
diff --git a/yt/cpp/mapreduce/http/ut/http_ut.cpp b/yt/cpp/mapreduce/http/ut/http_ut.cpp
index ca260841d0..e41e83c5a0 100644
--- a/yt/cpp/mapreduce/http/ut/http_ut.cpp
+++ b/yt/cpp/mapreduce/http/ut/http_ut.cpp
@@ -70,9 +70,8 @@ TEST(TFramingTest, FramingSimple)
{
auto server = CreateFramingEchoServer();
- THttpRequest request;
- request.Connect(server->GetAddress());
- auto requestStream = request.StartRequest(THttpHeader("POST", "concatenate"));
+ THttpRequest request("0-0-0-0", server->GetAddress(), THttpHeader("POST", "concatenate"), TDuration::Zero());
+ auto requestStream = request.StartRequest();
*requestStream << "Some funny data";
request.FinishRequest();
auto response = request.GetResponseStream()->ReadAll();
@@ -83,9 +82,8 @@ TEST(TFramingTest, FramingLarge)
{
auto server = CreateFramingEchoServer();
- THttpRequest request;
- request.Connect(server->GetAddress());
- auto requestStream = request.StartRequest(THttpHeader("POST", "concatenate"));
+ THttpRequest request("0-0-0-0", server->GetAddress(), THttpHeader("POST", "concatenate"), TDuration::Zero());
+ auto requestStream = request.StartRequest();
auto data = TString(100000, 'x');
*requestStream << data;
request.FinishRequest();