diff options
author | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/http/http.cpp | |
parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
download | ydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/http/http.cpp')
-rw-r--r-- | yt/cpp/mapreduce/http/http.cpp | 1014 |
1 files changed, 1014 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp new file mode 100644 index 0000000000..d44b2638a0 --- /dev/null +++ b/yt/cpp/mapreduce/http/http.cpp @@ -0,0 +1,1014 @@ +#include "http.h" + +#include "abortable_http_response.h" +#include "core.h" +#include "helpers.h" + +#include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/cpp/mapreduce/common/retry_lib.h> +#include <yt/cpp/mapreduce/common/wait_proxy.h> + +#include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/errors.h> +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <yt/yt/core/http/http.h> + +#include <library/cpp/json/json_writer.h> + +#include <library/cpp/string_utils/base64/base64.h> +#include <library/cpp/string_utils/quote/quote.h> + +#include <util/generic/singleton.h> +#include <util/generic/algorithm.h> + +#include <util/stream/mem.h> + +#include <util/string/builder.h> +#include <util/string/cast.h> +#include <util/string/escape.h> +#include <util/string/printf.h> + +#include <util/system/byteorder.h> +#include <util/system/getpid.h> + +#include <exception> + + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class THttpRequest::TRequestStream + : public IOutputStream +{ +public: + TRequestStream(THttpRequest* httpRequest, const TSocket& s) + : HttpRequest_(httpRequest) + , SocketOutput_(s) + , HttpOutput_(static_cast<IOutputStream*>(&SocketOutput_)) + { + HttpOutput_.EnableKeepAlive(true); + } + +private: + void DoWrite(const void* buf, size_t len) override + { + WrapWriteFunc([&] { + HttpOutput_.Write(buf, len); + }); + } + + void DoWriteV(const TPart* parts, size_t count) override + { + WrapWriteFunc([&] { + HttpOutput_.Write(parts, count); + }); + } + + void DoWriteC(char ch) override + { + WrapWriteFunc([&] { + HttpOutput_.Write(ch); + }); + } + + void DoFlush() override + { + WrapWriteFunc([&] { + HttpOutput_.Flush(); + }); + } + + void DoFinish() override + { + WrapWriteFunc([&] { + HttpOutput_.Finish(); + }); + } + + void WrapWriteFunc(std::function<void()> func) + { + CheckErrorState(); + try { + func(); + } catch (const std::exception&) { + HandleWriteException(); + } + } + + // In many cases http proxy stops reading request and resets connection + // if error has happend. This function tries to read error response + // in such cases. + void HandleWriteException() { + Y_VERIFY(WriteError_ == nullptr); + WriteError_ = std::current_exception(); + Y_VERIFY(WriteError_ != nullptr); + try { + HttpRequest_->GetResponseStream(); + } catch (const TErrorResponse &) { + throw; + } catch (...) { + } + std::rethrow_exception(WriteError_); + } + + void CheckErrorState() + { + if (WriteError_) { + std::rethrow_exception(WriteError_); + } + } + +private: + THttpRequest* const HttpRequest_; + TSocketOutput SocketOutput_; + THttpOutput HttpOutput_; + std::exception_ptr WriteError_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +THttpHeader::THttpHeader(const TString& method, const TString& command, bool isApi) + : Method(method) + , Command(command) + , IsApi(isApi) +{ } + +void THttpHeader::AddParameter(const TString& key, TNode value, bool overwrite) +{ + auto it = Parameters.find(key); + if (it == Parameters.end()) { + Parameters.emplace(key, std::move(value)); + } else { + if (overwrite) { + it->second = std::move(value); + } else { + ythrow yexception() << "Duplicate key: " << key; + } + } +} + +void THttpHeader::MergeParameters(const TNode& newParameters, bool overwrite) +{ + for (const auto& p : newParameters.AsMap()) { + AddParameter(p.first, p.second, overwrite); + } +} + +void THttpHeader::RemoveParameter(const TString& key) +{ + Parameters.erase(key); +} + +TNode THttpHeader::GetParameters() const +{ + return Parameters; +} + +void THttpHeader::AddTransactionId(const TTransactionId& transactionId, bool overwrite) +{ + if (transactionId) { + AddParameter("transaction_id", GetGuidAsString(transactionId), overwrite); + } else { + RemoveParameter("transaction_id"); + } +} + +void THttpHeader::AddPath(const TString& path, bool overwrite) +{ + AddParameter("path", path, overwrite); +} + +void THttpHeader::AddOperationId(const TOperationId& operationId, bool overwrite) +{ + AddParameter("operation_id", GetGuidAsString(operationId), overwrite); +} + +void THttpHeader::AddMutationId() +{ + TGUID guid; + + // Some users use `fork()' with yt wrapper + // (actually they use python + multiprocessing) + // and CreateGuid is not resistant to `fork()', so spice it a little bit. + // + // Check IGNIETFERRO-610 + CreateGuid(&guid); + guid.dw[2] = GetPID() ^ MicroSeconds(); + + AddParameter("mutation_id", GetGuidAsString(guid), true); +} + +bool THttpHeader::HasMutationId() const +{ + return Parameters.contains("mutation_id"); +} + +void THttpHeader::SetToken(const TString& token) +{ + Token = token; +} + +void THttpHeader::SetImpersonationUser(const TString& impersonationUser) +{ + ImpersonationUser = impersonationUser; +} + +void THttpHeader::SetServiceTicket(const TString& ticket) +{ + ServiceTicket = ticket; +} + +void THttpHeader::SetInputFormat(const TMaybe<TFormat>& format) +{ + InputFormat = format; +} + +void THttpHeader::SetOutputFormat(const TMaybe<TFormat>& format) +{ + OutputFormat = format; +} + +TMaybe<TFormat> THttpHeader::GetOutputFormat() const +{ + return OutputFormat; +} + +void THttpHeader::SetRequestCompression(const TString& compression) +{ + RequestCompression = compression; +} + +void THttpHeader::SetResponseCompression(const TString& compression) +{ + ResponseCompression = compression; +} + +TString THttpHeader::GetCommand() const +{ + return Command; +} + +TString THttpHeader::GetUrl() const +{ + TStringStream url; + + if (IsApi) { + url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command; + } else { + url << "/" << Command; + } + + return url.Str(); +} + +bool THttpHeader::ShouldAcceptFraming() const +{ + return TConfig::Get()->CommandsWithFraming.contains(Command); +} + +TString THttpHeader::GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters) const +{ + TStringStream result; + + result << Method << " " << GetUrl() << " HTTP/1.1\r\n"; + + GetHeader(hostName, requestId, includeParameters).Get()->WriteTo(&result); + + if (ShouldAcceptFraming()) { + result << "X-YT-Accept-Framing: 1\r\n"; + } + + result << "\r\n"; + + return result.Str(); +} + +NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const TString& requestId, bool includeParameters) const +{ + auto headers = New<NHttp::THeaders>(); + + headers->Add("Host", hostName); + headers->Add("User-Agent", TProcessState::Get()->ClientVersion); + + if (!Token.empty()) { + headers->Add("Authorization", "OAuth " + Token); + } + if (!ServiceTicket.empty()) { + headers->Add("X-Ya-Service-Ticket", ServiceTicket); + } + if (!ImpersonationUser.empty()) { + headers->Add("X-Yt-User-Name", ImpersonationUser); + } + + if (Method == "PUT" || Method == "POST") { + headers->Add("Transfer-Encoding", "chunked"); + } + + headers->Add("X-YT-Correlation-Id", requestId); + headers->Add("X-YT-Header-Format", "<format=text>yson"); + + headers->Add("Content-Encoding", RequestCompression); + headers->Add("Accept-Encoding", ResponseCompression); + + auto printYTHeader = [&headers] (const char* headerName, const TString& value) { + static const size_t maxHttpHeaderSize = 64 << 10; + if (!value) { + return; + } + if (value.size() <= maxHttpHeaderSize) { + headers->Add(headerName, value); + return; + } + + TString encoded; + Base64Encode(value, encoded); + auto ptr = encoded.data(); + auto finish = encoded.data() + encoded.size(); + size_t index = 0; + do { + auto end = Min(ptr + maxHttpHeaderSize, finish); + headers->Add(Format("%v%v", headerName, index++), TString(ptr, end)); + ptr = end; + } while (ptr != finish); + }; + + if (InputFormat) { + printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat->Config)); + } + if (OutputFormat) { + printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat->Config)); + } + if (includeParameters) { + printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters)); + } + + return NHttp::THeadersPtrWrapper(std::move(headers)); +} + +const TString& THttpHeader::GetMethod() const +{ + return Method; +} + +//////////////////////////////////////////////////////////////////////////////// + +TAddressCache* TAddressCache::Get() +{ + return Singleton<TAddressCache>(); +} + +bool ContainsAddressOfRequiredVersion(const TAddressCache::TAddressPtr& address) +{ + if (!TConfig::Get()->ForceIpV4 && !TConfig::Get()->ForceIpV6) { + return true; + } + + for (auto i = address->Begin(); i != address->End(); ++i) { + const auto& addressInfo = *i; + if (TConfig::Get()->ForceIpV4 && addressInfo.ai_family == AF_INET) { + return true; + } + if (TConfig::Get()->ForceIpV6 && addressInfo.ai_family == AF_INET6) { + return true; + } + } + return false; +} + +TAddressCache::TAddressPtr TAddressCache::Resolve(const TString& hostName) +{ + auto address = FindAddress(hostName); + if (address) { + return address; + } + + TString host(hostName); + ui16 port = 80; + + auto colon = hostName.find(':'); + if (colon != TString::npos) { + port = FromString<ui16>(hostName.substr(colon + 1)); + host = hostName.substr(0, colon); + } + + auto retryPolicy = CreateDefaultRequestRetryPolicy(TConfig::Get()); + auto error = yexception() << "can not resolve address of required version for host " << hostName; + while (true) { + address = new TNetworkAddress(host, port); + if (ContainsAddressOfRequiredVersion(address)) { + break; + } + retryPolicy->NotifyNewAttempt(); + YT_LOG_DEBUG("Failed to resolve address of required version for host %v, retrying: %v", + hostName, + retryPolicy->GetAttemptDescription()); + if (auto backoffDuration = retryPolicy->OnGenericError(error)) { + NDetail::TWaitProxy::Get()->Sleep(*backoffDuration); + } else { + ythrow error; + } + } + + AddAddress(hostName, address); + return address; +} + +TAddressCache::TAddressPtr TAddressCache::FindAddress(const TString& hostName) const +{ + TCacheEntry entry; + { + TReadGuard guard(Lock_); + auto it = Cache_.find(hostName); + if (it == Cache_.end()) { + return nullptr; + } + entry = it->second; + } + + if (TInstant::Now() > entry.ExpirationTime) { + YT_LOG_DEBUG("Address resolution cache entry for host %v is expired, will retry resolution", + hostName); + return nullptr; + } + + if (!ContainsAddressOfRequiredVersion(entry.Address)) { + YT_LOG_DEBUG("Address of required version not found for host %v, will retry resolution", + hostName); + return nullptr; + } + + return entry.Address; +} + +void TAddressCache::AddAddress(TString hostName, TAddressPtr address) +{ + auto entry = TCacheEntry{ + .Address = std::move(address), + .ExpirationTime = TInstant::Now() + TConfig::Get()->AddressCacheExpirationTimeout, + }; + + { + TWriteGuard guard(Lock_); + Cache_.emplace(std::move(hostName), std::move(entry)); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +TConnectionPool* TConnectionPool::Get() +{ + return Singleton<TConnectionPool>(); +} + +TConnectionPtr TConnectionPool::Connect( + const TString& hostName, + TDuration socketTimeout) +{ + Refresh(); + + if (socketTimeout == TDuration::Zero()) { + socketTimeout = TConfig::Get()->SocketTimeout; + } + + { + auto guard = Guard(Lock_); + auto now = TInstant::Now(); + auto range = Connections_.equal_range(hostName); + for (auto it = range.first; it != range.second; ++it) { + auto& connection = it->second; + if (connection->DeadLine < now) { + continue; + } + if (!AtomicCas(&connection->Busy, 1, 0)) { + continue; + } + + connection->DeadLine = now + socketTimeout; + connection->Socket->SetSocketTimeout(socketTimeout.Seconds()); + return connection; + } + } + + TConnectionPtr connection(new TConnection); + + auto networkAddress = TAddressCache::Get()->Resolve(hostName); + TSocketHolder socket(DoConnect(networkAddress)); + SetNonBlock(socket, false); + + connection->Socket.Reset(new TSocket(socket.Release())); + + connection->DeadLine = TInstant::Now() + socketTimeout; + connection->Socket->SetSocketTimeout(socketTimeout.Seconds()); + + { + auto guard = Guard(Lock_); + static ui32 connectionId = 0; + connection->Id = ++connectionId; + Connections_.insert({hostName, connection}); + } + + YT_LOG_DEBUG("New connection to %v #%v opened", + hostName, + connection->Id); + + return connection; +} + +void TConnectionPool::Release(TConnectionPtr connection) +{ + auto socketTimeout = TConfig::Get()->SocketTimeout; + auto newDeadline = TInstant::Now() + socketTimeout; + + { + auto guard = Guard(Lock_); + connection->DeadLine = newDeadline; + } + + connection->Socket->SetSocketTimeout(socketTimeout.Seconds()); + AtomicSet(connection->Busy, 0); + + Refresh(); +} + +void TConnectionPool::Invalidate( + const TString& hostName, + TConnectionPtr connection) +{ + auto guard = Guard(Lock_); + auto range = Connections_.equal_range(hostName); + for (auto it = range.first; it != range.second; ++it) { + if (it->second == connection) { + YT_LOG_DEBUG("Closing connection #%v", + connection->Id); + Connections_.erase(it); + return; + } + } +} + +void TConnectionPool::Refresh() +{ + auto guard = Guard(Lock_); + + // simple, since we don't expect too many connections + using TItem = std::pair<TInstant, TConnectionMap::iterator>; + std::vector<TItem> sortedConnections; + for (auto it = Connections_.begin(); it != Connections_.end(); ++it) { + sortedConnections.emplace_back(it->second->DeadLine, it); + } + + std::sort( + sortedConnections.begin(), + sortedConnections.end(), + [] (const TItem& a, const TItem& b) -> bool { + return a.first < b.first; + }); + + auto removeCount = static_cast<int>(Connections_.size()) - TConfig::Get()->ConnectionPoolSize; + + const auto now = TInstant::Now(); + for (const auto& item : sortedConnections) { + const auto& mapIterator = item.second; + auto connection = mapIterator->second; + if (AtomicGet(connection->Busy)) { + continue; + } + + if (removeCount > 0) { + Connections_.erase(mapIterator); + YT_LOG_DEBUG("Closing connection #%v (too many opened connections)", + connection->Id); + --removeCount; + continue; + } + + if (connection->DeadLine < now) { + Connections_.erase(mapIterator); + YT_LOG_DEBUG("Closing connection #%v (timeout)", + connection->Id); + } + } +} + +SOCKET TConnectionPool::DoConnect(TAddressCache::TAddressPtr address) +{ + int lastError = 0; + + for (auto i = address->Begin(); i != address->End(); ++i) { + struct addrinfo* info = &*i; + + if (TConfig::Get()->ForceIpV4 && info->ai_family != AF_INET) { + continue; + } + + if (TConfig::Get()->ForceIpV6 && info->ai_family != AF_INET6) { + continue; + } + + TSocketHolder socket( + ::socket(info->ai_family, info->ai_socktype, info->ai_protocol)); + + if (socket.Closed()) { + lastError = LastSystemError(); + continue; + } + + SetNonBlock(socket, true); + if (TConfig::Get()->SocketPriority) { + SetSocketPriority(socket, *TConfig::Get()->SocketPriority); + } + + if (connect(socket, info->ai_addr, info->ai_addrlen) == 0) + return socket.Release(); + + int err = LastSystemError(); + if (err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK) { + struct pollfd p = { + socket, + POLLOUT, + 0 + }; + const ssize_t n = PollD(&p, 1, TInstant::Now() + TConfig::Get()->ConnectTimeout); + if (n < 0) { + ythrow TSystemError(-(int)n) << "can not connect to " << info; + } + CheckedGetSockOpt(socket, SOL_SOCKET, SO_ERROR, err, "socket error"); + if (!err) + return socket.Release(); + } + + lastError = err; + continue; + } + + ythrow TSystemError(lastError) << "can not connect to " << *address; +} + +//////////////////////////////////////////////////////////////////////////////// + +static TMaybe<TString> GetProxyName(const THttpInput& input) +{ + if (auto proxyHeader = input.Headers().FindHeader("X-YT-Proxy")) { + return proxyHeader->Value(); + } + return Nothing(); +} + +THttpResponse::THttpResponse( + IInputStream* socketStream, + const TString& requestId, + const TString& hostName) + : HttpInput_(socketStream) + , RequestId_(requestId) + , HostName_(GetProxyName(HttpInput_).GetOrElse(hostName)) + , Unframe_(HttpInput_.Headers().HasHeader("X-YT-Framing")) +{ + HttpCode_ = ParseHttpRetCode(HttpInput_.FirstLine()); + if (HttpCode_ == 200 || HttpCode_ == 202) { + return; + } + + ErrorResponse_ = TErrorResponse(HttpCode_, RequestId_); + + auto logAndSetError = [&] (const TString& rawError) { + YT_LOG_ERROR("RSP %v - HTTP %v - %v", + RequestId_, + HttpCode_, + rawError.data()); + ErrorResponse_->SetRawError(rawError); + }; + + switch (HttpCode_) { + case 429: + logAndSetError("request rate limit exceeded"); + break; + + case 500: + logAndSetError(::TStringBuilder() << "internal error in proxy " << HostName_); + break; + + default: { + TStringStream httpHeaders; + httpHeaders << "HTTP headers ("; + for (const auto& header : HttpInput_.Headers()) { + httpHeaders << header.Name() << ": " << header.Value() << "; "; + } + httpHeaders << ")"; + + auto errorString = Sprintf("RSP %s - HTTP %d - %s", + RequestId_.data(), + HttpCode_, + httpHeaders.Str().data()); + + YT_LOG_ERROR("%v", + errorString.data()); + + if (auto parsedResponse = ParseError(HttpInput_.Headers())) { + ErrorResponse_ = parsedResponse.GetRef(); + } else { + ErrorResponse_->SetRawError( + errorString + " - X-YT-Error is missing in headers"); + } + break; + } + } +} + +const THttpHeaders& THttpResponse::Headers() const +{ + return HttpInput_.Headers(); +} + +void THttpResponse::CheckErrorResponse() const +{ + if (ErrorResponse_) { + throw *ErrorResponse_; + } +} + +bool THttpResponse::IsExhausted() const +{ + return IsExhausted_; +} + +int THttpResponse::GetHttpCode() const +{ + return HttpCode_; +} + +const TString& THttpResponse::GetHostName() const +{ + return HostName_; +} + +bool THttpResponse::IsKeepAlive() const +{ + return HttpInput_.IsKeepAlive(); +} + +TMaybe<TErrorResponse> THttpResponse::ParseError(const THttpHeaders& headers) +{ + for (const auto& header : headers) { + if (header.Name() == "X-YT-Error") { + TErrorResponse errorResponse(HttpCode_, RequestId_); + errorResponse.ParseFromJsonError(header.Value()); + if (errorResponse.IsOk()) { + return Nothing(); + } + return errorResponse; + } + } + return Nothing(); +} + +size_t THttpResponse::DoRead(void* buf, size_t len) +{ + size_t read; + if (Unframe_) { + read = UnframeRead(buf, len); + } else { + read = HttpInput_.Read(buf, len); + } + if (read == 0 && len != 0) { + // THttpInput MUST return defined (but may be empty) + // trailers when it is exhausted. + Y_VERIFY(HttpInput_.Trailers().Defined(), + "trailers MUST be defined for exhausted stream"); + CheckTrailers(HttpInput_.Trailers().GetRef()); + IsExhausted_ = true; + } + return read; +} + +size_t THttpResponse::DoSkip(size_t len) +{ + size_t skipped; + if (Unframe_) { + skipped = UnframeSkip(len); + } else { + skipped = HttpInput_.Skip(len); + } + if (skipped == 0 && len != 0) { + // THttpInput MUST return defined (but may be empty) + // trailers when it is exhausted. + Y_VERIFY(HttpInput_.Trailers().Defined(), + "trailers MUST be defined for exhausted stream"); + CheckTrailers(HttpInput_.Trailers().GetRef()); + IsExhausted_ = true; + } + return skipped; +} + +void THttpResponse::CheckTrailers(const THttpHeaders& trailers) +{ + if (auto errorResponse = ParseError(trailers)) { + errorResponse->SetIsFromTrailers(true); + YT_LOG_ERROR("RSP %v - %v", + RequestId_, + errorResponse.GetRef().what()); + ythrow errorResponse.GetRef(); + } +} + +static ui32 ReadDataFrameSize(THttpInput* stream) +{ + ui32 littleEndianSize; + auto read = stream->Load(&littleEndianSize, sizeof(littleEndianSize)); + if (read < sizeof(littleEndianSize)) { + ythrow yexception() << "Bad data frame header: " << + "expected " << sizeof(littleEndianSize) << " bytes, got " << read; + } + return LittleToHost(littleEndianSize); +} + +bool THttpResponse::RefreshFrameIfNecessary() +{ + while (RemainingFrameSize_ == 0) { + ui8 frameTypeByte; + auto read = HttpInput_.Read(&frameTypeByte, sizeof(frameTypeByte)); + if (read == 0) { + return false; + } + auto frameType = static_cast<EFrameType>(frameTypeByte); + switch (frameType) { + case EFrameType::KeepAlive: + break; + case EFrameType::Data: + RemainingFrameSize_ = ReadDataFrameSize(&HttpInput_); + break; + default: + ythrow yexception() << "Bad frame type " << static_cast<int>(frameTypeByte); + } + } + return true; +} + +size_t THttpResponse::UnframeRead(void* buf, size_t len) +{ + if (!RefreshFrameIfNecessary()) { + return 0; + } + auto read = HttpInput_.Read(buf, Min(len, RemainingFrameSize_)); + RemainingFrameSize_ -= read; + return read; +} + +size_t THttpResponse::UnframeSkip(size_t len) +{ + if (!RefreshFrameIfNecessary()) { + return 0; + } + auto skipped = HttpInput_.Skip(Min(len, RemainingFrameSize_)); + RemainingFrameSize_ -= skipped; + return skipped; +} + +//////////////////////////////////////////////////////////////////////////////// + +THttpRequest::THttpRequest() +{ + RequestId = CreateGuidAsString(); +} + +THttpRequest::THttpRequest(const TString& requestId) + : RequestId(requestId) +{ } + +THttpRequest::~THttpRequest() +{ + if (!Connection) { + return; + } + + if (Input && Input->IsKeepAlive() && Input->IsExhausted()) { + // We should return to the pool only connections where HTTP response was fully read. + // Otherwise next reader might read our remaining data and misinterpret them (YT-6510). + TConnectionPool::Get()->Release(Connection); + } else { + TConnectionPool::Get()->Invalidate(HostName, Connection); + } +} + +TString THttpRequest::GetRequestId() const +{ + return RequestId; +} + +void THttpRequest::Connect(TString hostName, TDuration socketTimeout) +{ + HostName = std::move(hostName); + YT_LOG_DEBUG("REQ %v - requesting connection to %v from connection pool", + RequestId, + HostName); + + StartTime_ = TInstant::Now(); + Connection = TConnectionPool::Get()->Connect(HostName, socketTimeout); + + YT_LOG_DEBUG("REQ %v - connection #%v", + RequestId, + Connection->Id); +} + +IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool includeParameters) +{ + auto strHeader = header.GetHeaderAsString(HostName, RequestId, includeParameters); + Url_ = header.GetUrl(); + + LogRequest(header, Url_, includeParameters, RequestId, HostName); + + LoggedAttributes_ = GetLoggedAttributes(header, Url_, includeParameters, 128); + + auto outputFormat = header.GetOutputFormat(); + if (outputFormat && outputFormat->IsTextYson()) { + LogResponse = true; + } + + RequestStream_ = MakeHolder<TRequestStream>(this, *Connection->Socket.Get()); + + RequestStream_->Write(strHeader.data(), strHeader.size()); + return RequestStream_.Get(); +} + +IOutputStream* THttpRequest::StartRequest(const THttpHeader& header) +{ + return StartRequestImpl(header, true); +} + +void THttpRequest::FinishRequest() +{ + RequestStream_->Flush(); + RequestStream_->Finish(); +} + +void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body) +{ + if (!body && (header.GetMethod() == "PUT" || header.GetMethod() == "POST")) { + const auto& parameters = header.GetParameters(); + auto parametersStr = NodeToYsonString(parameters); + auto* output = StartRequestImpl(header, false); + output->Write(parametersStr); + FinishRequest(); + } else { + auto* output = StartRequest(header); + if (body) { + output->Write(*body); + } + FinishRequest(); + } +} + +THttpResponse* THttpRequest::GetResponseStream() +{ + if (!Input) { + SocketInput.Reset(new TSocketInput(*Connection->Socket.Get())); + if (TConfig::Get()->UseAbortableResponse) { + Y_VERIFY(!Url_.empty()); + Input.Reset(new TAbortableHttpResponse(SocketInput.Get(), RequestId, HostName, Url_)); + } else { + Input.Reset(new THttpResponse(SocketInput.Get(), RequestId, HostName)); + } + Input->CheckErrorResponse(); + } + return Input.Get(); +} + +TString THttpRequest::GetResponse() +{ + TString result = GetResponseStream()->ReadAll(); + + TStringStream loggedAttributes; + loggedAttributes + << "Time: " << TInstant::Now() - StartTime_ << "; " + << "HostName: " << GetResponseStream()->GetHostName() << "; " + << LoggedAttributes_; + + if (LogResponse) { + constexpr auto sizeLimit = 1 << 7; + YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)", + RequestId, + TruncateForLogs(result, sizeLimit), + loggedAttributes.Str()); + } else { + YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)", + RequestId, + result.size(), + loggedAttributes.Str()); + } + return result; +} + +int THttpRequest::GetHttpCode() { + return GetResponseStream()->GetHttpCode(); +} + +void THttpRequest::InvalidateConnection() +{ + TConnectionPool::Get()->Invalidate(HostName, Connection); + Connection.Reset(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT |