aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/http/http.cpp
diff options
context:
space:
mode:
authorermolovd <ermolovd@yandex-team.com>2024-11-19 10:54:24 +0300
committerermolovd <ermolovd@yandex-team.com>2024-11-19 11:06:45 +0300
commit7be946f684e606f5baa9e3a13401cc19b8ac9a3b (patch)
treea4d6ce2a580ba953a048a64bdedbda1f6c7a1d26 /yt/cpp/mapreduce/http/http.cpp
parent907667c6ffd6222845fe8e18fd548e508250b5d0 (diff)
downloadydb-7be946f684e606f5baa9e3a13401cc19b8ac9a3b.tar.gz
YT-22943: add context for system errors
* thrown system errors contain context (i.e. host method and request id that produced error) Type: feature Component: cpp-sdk commit_hash:af72a3a37785e9e373e816c2cc072df2076f821d
Diffstat (limited to 'yt/cpp/mapreduce/http/http.cpp')
-rw-r--r--yt/cpp/mapreduce/http/http.cpp223
1 files changed, 150 insertions, 73 deletions
diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp
index 12aa33ff29..ca243a929a 100644
--- a/yt/cpp/mapreduce/http/http.cpp
+++ b/yt/cpp/mapreduce/http/http.cpp
@@ -10,6 +10,7 @@
#include <yt/cpp/mapreduce/interface/config.h>
#include <yt/cpp/mapreduce/interface/errors.h>
+#include <yt/cpp/mapreduce/interface/error_codes.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
#include <yt/yt/core/http/http.h>
@@ -39,6 +40,27 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
+std::exception_ptr WrapSystemError(
+ const TRequestContext& context,
+ const std::exception& ex)
+{
+ if (auto errorResponse = dynamic_cast<const TErrorResponse*>(&ex); errorResponse != nullptr) {
+ return std::make_exception_ptr(errorResponse);
+ }
+
+ auto message = NYT::Format("Request %qv to %qv failed", context.RequestId, context.HostName + context.Method);
+ TYtError outer(1, message, {TYtError(NClusterErrorCodes::Generic, ex.what())}, {
+ {"request_id", context.RequestId},
+ {"host", context.HostName},
+ {"method", context.Method},
+ });
+ TTransportError errorResponse(std::move(outer));
+
+ return std::make_exception_ptr(errorResponse);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
class THttpRequest::TRequestStream
: public IOutputStream
{
@@ -92,17 +114,17 @@ private:
CheckErrorState();
try {
func();
- } catch (const std::exception&) {
- HandleWriteException();
+ } catch (const std::exception& ex) {
+ HandleWriteException(ex);
}
}
// In many cases http proxy stops reading request and resets connection
// if error has happend. This function tries to read error response
// in such cases.
- void HandleWriteException() {
+ void HandleWriteException(const std::exception& ex) {
Y_ABORT_UNLESS(WriteError_ == nullptr);
- WriteError_ = std::current_exception();
+ WriteError_ = WrapSystemError(HttpRequest_->Context_, ex);
Y_ABORT_UNLESS(WriteError_ != nullptr);
try {
HttpRequest_->GetResponseStream();
@@ -667,33 +689,81 @@ SOCKET TConnectionPool::DoConnect(TAddressCache::TAddressPtr address)
////////////////////////////////////////////////////////////////////////////////
-static TMaybe<TString> GetProxyName(const THttpInput& input)
+class THttpResponse::THttpInputWrapped
+ : public IInputStream
{
- if (auto proxyHeader = input.Headers().FindHeader("X-YT-Proxy")) {
- return proxyHeader->Value();
+public:
+ explicit THttpInputWrapped(TRequestContext context, IInputStream* input)
+ : Context_(std::move(context))
+ , HttpInput_(input)
+ { }
+
+ const THttpHeaders& Headers() const noexcept
+ {
+ return HttpInput_.Headers();
+ }
+
+ const TString& FirstLine() const noexcept
+ {
+ return HttpInput_.FirstLine();
}
- return Nothing();
-}
+
+ bool IsKeepAlive() const noexcept
+ {
+ return HttpInput_.IsKeepAlive();
+ }
+
+ const TMaybe<THttpHeaders>& Trailers() const noexcept
+ {
+ return HttpInput_.Trailers();
+ }
+
+private:
+ size_t DoRead(void* buf, size_t len) override
+ {
+ try {
+ return HttpInput_.Read(buf, len);
+ } catch (const std::exception& ex) {
+ auto wrapped = WrapSystemError(Context_, ex);
+ std::rethrow_exception(wrapped);
+ }
+ }
+
+ size_t DoSkip(size_t len) override
+ {
+ try {
+ return HttpInput_.Skip(len);
+ } catch (const std::exception& ex) {
+ auto wrapped = WrapSystemError(Context_, ex);
+ std::rethrow_exception(wrapped);
+ }
+ }
+
+private:
+ const TRequestContext Context_;
+ THttpInput HttpInput_;
+};
THttpResponse::THttpResponse(
- IInputStream* socketStream,
- const TString& requestId,
- const TString& hostName)
- : RequestId_(requestId)
- , HttpInput_(socketStream)
- , HostName_(GetProxyName(HttpInput_).GetOrElse(hostName))
- , Unframe_(HttpInput_.Headers().HasHeader("X-YT-Framing"))
+ TRequestContext context,
+ IInputStream* socketStream)
+ : HttpInput_(MakeHolder<THttpInputWrapped>(context, socketStream))
+ , Unframe_(HttpInput_->Headers().HasHeader("X-YT-Framing"))
+ , Context_(std::move(context))
{
- HttpCode_ = ParseHttpRetCode(HttpInput_.FirstLine());
+ if (auto proxyHeader = HttpInput_->Headers().FindHeader("X-YT-Proxy")) {
+ Context_.HostName = proxyHeader->Value();
+ }
+ HttpCode_ = ParseHttpRetCode(HttpInput_->FirstLine());
if (HttpCode_ == 200 || HttpCode_ == 202) {
return;
}
- ErrorResponse_ = TErrorResponse(HttpCode_, RequestId_);
+ ErrorResponse_ = TErrorResponse(HttpCode_, Context_.RequestId);
auto logAndSetError = [&] (const TString& rawError) {
YT_LOG_ERROR("RSP %v - HTTP %v - %v",
- RequestId_,
+ Context_.RequestId,
HttpCode_,
rawError.data());
ErrorResponse_->SetRawError(rawError);
@@ -705,26 +775,26 @@ THttpResponse::THttpResponse(
break;
case 500:
- logAndSetError(::TStringBuilder() << "internal error in proxy " << HostName_);
+ logAndSetError(::TStringBuilder() << "internal error in proxy " << Context_.HostName);
break;
default: {
TStringStream httpHeaders;
httpHeaders << "HTTP headers (";
- for (const auto& header : HttpInput_.Headers()) {
+ for (const auto& header : HttpInput_->Headers()) {
httpHeaders << header.Name() << ": " << header.Value() << "; ";
}
httpHeaders << ")";
auto errorString = Sprintf("RSP %s - HTTP %d - %s",
- RequestId_.data(),
+ Context_.RequestId.data(),
HttpCode_,
httpHeaders.Str().data());
YT_LOG_ERROR("%v",
errorString.data());
- if (auto parsedResponse = ParseError(HttpInput_.Headers())) {
+ if (auto parsedResponse = ParseError(HttpInput_->Headers())) {
ErrorResponse_ = parsedResponse.GetRef();
} else {
ErrorResponse_->SetRawError(
@@ -735,9 +805,12 @@ THttpResponse::THttpResponse(
}
}
+THttpResponse::~THttpResponse()
+{ }
+
const THttpHeaders& THttpResponse::Headers() const
{
- return HttpInput_.Headers();
+ return HttpInput_->Headers();
}
void THttpResponse::CheckErrorResponse() const
@@ -759,19 +832,19 @@ int THttpResponse::GetHttpCode() const
const TString& THttpResponse::GetHostName() const
{
- return HostName_;
+ return Context_.HostName;
}
bool THttpResponse::IsKeepAlive() const
{
- return HttpInput_.IsKeepAlive();
+ return HttpInput_->IsKeepAlive();
}
TMaybe<TErrorResponse> THttpResponse::ParseError(const THttpHeaders& headers)
{
for (const auto& header : headers) {
if (header.Name() == "X-YT-Error") {
- TErrorResponse errorResponse(HttpCode_, RequestId_);
+ TErrorResponse errorResponse(HttpCode_, Context_.RequestId);
errorResponse.ParseFromJsonError(header.Value());
if (errorResponse.IsOk()) {
return Nothing();
@@ -788,14 +861,14 @@ size_t THttpResponse::DoRead(void* buf, size_t len)
if (Unframe_) {
read = UnframeRead(buf, len);
} else {
- read = HttpInput_.Read(buf, len);
+ read = HttpInput_->Read(buf, len);
}
if (read == 0 && len != 0) {
// THttpInput MUST return defined (but may be empty)
// trailers when it is exhausted.
- Y_ABORT_UNLESS(HttpInput_.Trailers().Defined(),
+ Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(),
"trailers MUST be defined for exhausted stream");
- CheckTrailers(HttpInput_.Trailers().GetRef());
+ CheckTrailers(HttpInput_->Trailers().GetRef());
IsExhausted_ = true;
}
return read;
@@ -807,14 +880,14 @@ size_t THttpResponse::DoSkip(size_t len)
if (Unframe_) {
skipped = UnframeSkip(len);
} else {
- skipped = HttpInput_.Skip(len);
+ skipped = HttpInput_->Skip(len);
}
if (skipped == 0 && len != 0) {
// THttpInput MUST return defined (but may be empty)
// trailers when it is exhausted.
- Y_ABORT_UNLESS(HttpInput_.Trailers().Defined(),
+ Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(),
"trailers MUST be defined for exhausted stream");
- CheckTrailers(HttpInput_.Trailers().GetRef());
+ CheckTrailers(HttpInput_->Trailers().GetRef());
IsExhausted_ = true;
}
return skipped;
@@ -825,13 +898,13 @@ void THttpResponse::CheckTrailers(const THttpHeaders& trailers)
if (auto errorResponse = ParseError(trailers)) {
errorResponse->SetIsFromTrailers(true);
YT_LOG_ERROR("RSP %v - %v",
- RequestId_,
+ Context_.RequestId,
errorResponse.GetRef().what());
ythrow errorResponse.GetRef();
}
}
-static ui32 ReadDataFrameSize(THttpInput* stream)
+static ui32 ReadDataFrameSize(IInputStream* stream)
{
ui32 littleEndianSize;
auto read = stream->Load(&littleEndianSize, sizeof(littleEndianSize));
@@ -846,7 +919,7 @@ bool THttpResponse::RefreshFrameIfNecessary()
{
while (RemainingFrameSize_ == 0) {
ui8 frameTypeByte;
- auto read = HttpInput_.Read(&frameTypeByte, sizeof(frameTypeByte));
+ auto read = HttpInput_->Read(&frameTypeByte, sizeof(frameTypeByte));
if (read == 0) {
return false;
}
@@ -855,7 +928,7 @@ bool THttpResponse::RefreshFrameIfNecessary()
case EFrameType::KeepAlive:
break;
case EFrameType::Data:
- RemainingFrameSize_ = ReadDataFrameSize(&HttpInput_);
+ RemainingFrameSize_ = ReadDataFrameSize(HttpInput_.Get());
break;
default:
ythrow yexception() << "Bad frame type " << static_cast<int>(frameTypeByte);
@@ -869,7 +942,7 @@ size_t THttpResponse::UnframeRead(void* buf, size_t len)
if (!RefreshFrameIfNecessary()) {
return 0;
}
- auto read = HttpInput_.Read(buf, Min(len, RemainingFrameSize_));
+ auto read = HttpInput_->Read(buf, Min(len, RemainingFrameSize_));
RemainingFrameSize_ -= read;
return read;
}
@@ -879,19 +952,22 @@ size_t THttpResponse::UnframeSkip(size_t len)
if (!RefreshFrameIfNecessary()) {
return 0;
}
- auto skipped = HttpInput_.Skip(Min(len, RemainingFrameSize_));
+ auto skipped = HttpInput_->Skip(Min(len, RemainingFrameSize_));
RemainingFrameSize_ -= skipped;
return skipped;
}
////////////////////////////////////////////////////////////////////////////////
-THttpRequest::THttpRequest()
- : RequestId_(CreateGuidAsString())
-{ }
-
-THttpRequest::THttpRequest(const TString& requestId)
- : RequestId_(requestId)
+THttpRequest::THttpRequest(TString requestId, TString hostName, THttpHeader header, TDuration socketTimeout)
+ : Context_(TRequestContext{
+ .RequestId = std::move(requestId),
+ .HostName = std::move(hostName),
+ .Method = header.GetUrl(/*needProxy=*/ false)
+ })
+ , Header_(std::move(header))
+ , Url_(Header_.GetUrl(true))
+ , SocketTimeout_(socketTimeout)
{ }
THttpRequest::~THttpRequest()
@@ -905,40 +981,41 @@ THttpRequest::~THttpRequest()
// Otherwise next reader might read our remaining data and misinterpret them (YT-6510).
TConnectionPool::Get()->Release(Connection_);
} else {
- TConnectionPool::Get()->Invalidate(HostName_, Connection_);
+ TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_);
}
}
TString THttpRequest::GetRequestId() const
{
- return RequestId_;
+ return Context_.RequestId;
}
-void THttpRequest::Connect(TString hostName, TDuration socketTimeout)
+IOutputStream* THttpRequest::StartRequestImpl(bool includeParameters)
{
- HostName_ = std::move(hostName);
YT_LOG_DEBUG("REQ %v - requesting connection to %v from connection pool",
- RequestId_,
- HostName_);
+ Context_.RequestId,
+ Context_.HostName);
StartTime_ = TInstant::Now();
- Connection_ = TConnectionPool::Get()->Connect(HostName_, socketTimeout);
+
+ try {
+ Connection_ = TConnectionPool::Get()->Connect(Context_.HostName, SocketTimeout_);
+ } catch (const std::exception& ex) {
+ auto wrapped = WrapSystemError(Context_, ex);
+ std::rethrow_exception(wrapped);
+ }
YT_LOG_DEBUG("REQ %v - connection #%v",
- RequestId_,
+ Context_.RequestId,
Connection_->Id);
-}
-IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool includeParameters)
-{
- auto strHeader = header.GetHeaderAsString(HostName_, RequestId_, includeParameters);
- Url_ = header.GetUrl(true);
+ auto strHeader = Header_.GetHeaderAsString(Context_.HostName, Context_.RequestId, includeParameters);
- LogRequest(header, Url_, includeParameters, RequestId_, HostName_);
+ LogRequest(Header_, Url_, includeParameters, Context_.RequestId, Context_.HostName);
- LoggedAttributes_ = GetLoggedAttributes(header, Url_, includeParameters, 128);
+ LoggedAttributes_ = GetLoggedAttributes(Header_, Url_, includeParameters, 128);
- auto outputFormat = header.GetOutputFormat();
+ auto outputFormat = Header_.GetOutputFormat();
if (outputFormat && outputFormat->IsTextYson()) {
LogResponse_ = true;
}
@@ -949,9 +1026,9 @@ IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool in
return RequestStream_.Get();
}
-IOutputStream* THttpRequest::StartRequest(const THttpHeader& header)
+IOutputStream* THttpRequest::StartRequest()
{
- return StartRequestImpl(header, true);
+ return StartRequestImpl(true);
}
void THttpRequest::FinishRequest()
@@ -960,16 +1037,16 @@ void THttpRequest::FinishRequest()
RequestStream_->Finish();
}
-void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body)
+void THttpRequest::SmallRequest(TMaybe<TStringBuf> body)
{
- if (!body && (header.GetMethod() == "PUT" || header.GetMethod() == "POST")) {
- const auto& parameters = header.GetParameters();
+ if (!body && (Header_.GetMethod() == "PUT" || Header_.GetMethod() == "POST")) {
+ const auto& parameters = Header_.GetParameters();
auto parametersStr = NodeToYsonString(parameters);
- auto* output = StartRequestImpl(header, false);
+ auto* output = StartRequestImpl(false);
output->Write(parametersStr);
FinishRequest();
} else {
- auto* output = StartRequest(header);
+ auto* output = StartRequest();
if (body) {
output->Write(*body);
}
@@ -983,9 +1060,9 @@ THttpResponse* THttpRequest::GetResponseStream()
SocketInput_.Reset(new TSocketInput(*Connection_->Socket.Get()));
if (TConfig::Get()->UseAbortableResponse) {
Y_ABORT_UNLESS(!Url_.empty());
- Input_.Reset(new TAbortableHttpResponse(SocketInput_.Get(), RequestId_, HostName_, Url_));
+ Input_.Reset(new TAbortableHttpResponse(Context_, SocketInput_.Get(), Url_));
} else {
- Input_.Reset(new THttpResponse(SocketInput_.Get(), RequestId_, HostName_));
+ Input_.Reset(new THttpResponse(Context_, SocketInput_.Get()));
}
Input_->CheckErrorResponse();
}
@@ -1005,12 +1082,12 @@ TString THttpRequest::GetResponse()
if (LogResponse_) {
constexpr auto sizeLimit = 1 << 7;
YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)",
- RequestId_,
+ Context_.RequestId,
TruncateForLogs(result, sizeLimit),
loggedAttributes.Str());
} else {
YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)",
- RequestId_,
+ Context_.RequestId,
result.size(),
loggedAttributes.Str());
}
@@ -1023,7 +1100,7 @@ int THttpRequest::GetHttpCode() {
void THttpRequest::InvalidateConnection()
{
- TConnectionPool::Get()->Invalidate(HostName_, Connection_);
+ TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_);
Connection_.Reset();
}