diff options
author | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/http/http_client.cpp | |
parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
download | ydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/http/http_client.cpp')
-rw-r--r-- | yt/cpp/mapreduce/http/http_client.cpp | 603 |
1 files changed, 603 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/http/http_client.cpp b/yt/cpp/mapreduce/http/http_client.cpp new file mode 100644 index 0000000000..a2af1182dc --- /dev/null +++ b/yt/cpp/mapreduce/http/http_client.cpp @@ -0,0 +1,603 @@ +#include "http_client.h" + +#include "abortable_http_response.h" +#include "core.h" +#include "helpers.h" +#include "http.h" + +#include <yt/cpp/mapreduce/interface/config.h> + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <yt/yt/core/concurrency/thread_pool_poller.h> + +#include <yt/yt/core/http/client.h> +#include <yt/yt/core/http/config.h> +#include <yt/yt/core/http/http.h> + +#include <yt/yt/core/https/client.h> +#include <yt/yt/core/https/config.h> + +#include <library/cpp/yson/node/node_io.h> + +namespace NYT::NHttpClient { + +namespace { + +TString CreateHost(TStringBuf host, TStringBuf port) +{ + if (!port.empty()) { + return Format("%v:%v", host, port); + } + + return TString(host); +} + +TMaybe<TErrorResponse> GetErrorResponse(const TString& hostName, const TString& requestId, const NHttp::IResponsePtr& response) +{ + auto httpCode = response->GetStatusCode(); + if (httpCode == NHttp::EStatusCode::OK || httpCode == NHttp::EStatusCode::Accepted) { + return {}; + } + + TErrorResponse errorResponse(static_cast<int>(httpCode), requestId); + + auto logAndSetError = [&] (const TString& rawError) { + YT_LOG_ERROR("RSP %v - HTTP %v - %v", + requestId, + httpCode, + rawError.data()); + errorResponse.SetRawError(rawError); + }; + + switch (httpCode) { + case NHttp::EStatusCode::TooManyRequests: + logAndSetError("request rate limit exceeded"); + break; + + case NHttp::EStatusCode::InternalServerError: + logAndSetError("internal error in proxy " + hostName); + break; + + default: { + TStringStream httpHeaders; + httpHeaders << "HTTP headers ("; + for (const auto& [headerName, headerValue] : response->GetHeaders()->Dump()) { + httpHeaders << headerName << ": " << headerValue << "; "; + } + httpHeaders << ")"; + + auto errorString = Sprintf("RSP %s - HTTP %d - %s", + requestId.data(), + static_cast<int>(httpCode), + httpHeaders.Str().data()); + + YT_LOG_ERROR("%v", + errorString.data()); + + if (auto errorHeader = response->GetHeaders()->Find("X-YT-Error")) { + errorResponse.ParseFromJsonError(*errorHeader); + if (errorResponse.IsOk()) { + return Nothing(); + } + return errorResponse; + } + + errorResponse.SetRawError( + errorString + " - X-YT-Error is missing in headers"); + break; + } + } + + return errorResponse; +} + +void CheckErrorResponse(const TString& hostName, const TString& requestId, const NHttp::IResponsePtr& response) +{ + auto errorResponse = GetErrorResponse(hostName, requestId, response); + if (errorResponse) { + throw *errorResponse; + } +} + +} // namespace + +/////////////////////////////////////////////////////////////////////////////// + +class TDefaultHttpResponse + : public IHttpResponse +{ +public: + TDefaultHttpResponse(std::unique_ptr<THttpRequest> request) + : Request_(std::move(request)) + { } + + int GetStatusCode() override + { + return Request_->GetHttpCode(); + } + + IInputStream* GetResponseStream() override + { + return Request_->GetResponseStream(); + } + + TString GetResponse() override + { + return Request_->GetResponse(); + } + + TString GetRequestId() const override + { + return Request_->GetRequestId(); + } + +private: + std::unique_ptr<THttpRequest> Request_; +}; + +class TDefaultHttpRequest + : public IHttpRequest +{ +public: + TDefaultHttpRequest(std::unique_ptr<THttpRequest> request, IOutputStream* stream) + : Request_(std::move(request)) + , Stream_(stream) + { } + + IOutputStream* GetStream() override + { + return Stream_; + } + + IHttpResponsePtr Finish() override + { + Request_->FinishRequest(); + return std::make_unique<TDefaultHttpResponse>(std::move(Request_)); + } + +private: + std::unique_ptr<THttpRequest> Request_; + IOutputStream* Stream_; +}; + +class TDefaultHttpClient + : public IHttpClient +{ +public: + IHttpResponsePtr Request(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header, TMaybe<TStringBuf> body) override + { + auto request = std::make_unique<THttpRequest>(requestId); + + auto urlRef = NHttp::ParseUrl(url); + + request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout); + request->SmallRequest(header, body); + return std::make_unique<TDefaultHttpResponse>(std::move(request)); + } + + IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header) override + { + auto request = std::make_unique<THttpRequest>(requestId); + + auto urlRef = NHttp::ParseUrl(url); + + request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout); + auto stream = request->StartRequest(header); + return std::make_unique<TDefaultHttpRequest>(std::move(request), stream); + } +}; + +/////////////////////////////////////////////////////////////////////////////// + +struct TCoreRequestContext +{ + TString HostName; + TString Url; + TString RequestId; + bool LogResponse; + TInstant StartTime; + TString LoggedAttributes; +}; + +class TCoreHttpResponse + : public IHttpResponse +{ +public: + TCoreHttpResponse( + TCoreRequestContext context, + NHttp::IResponsePtr response) + : Context_(std::move(context)) + , Response_(std::move(response)) + { } + + int GetStatusCode() override + { + return static_cast<int>(Response_->GetStatusCode()); + } + + IInputStream* GetResponseStream() override + { + if (!Stream_) { + auto stream = std::make_unique<TWrappedStream>( + NConcurrency::CreateSyncAdapter(NConcurrency::CreateCopyingAdapter(Response_), NConcurrency::EWaitForStrategy::WaitFor), + Response_, + Context_.RequestId); + CheckErrorResponse(Context_.HostName, Context_.RequestId, Response_); + + if (TConfig::Get()->UseAbortableResponse) { + Y_VERIFY(!Context_.Url.empty()); + Stream_ = std::make_unique<TAbortableCoreHttpResponse>(std::move(stream), Context_.Url); + } else { + Stream_ = std::move(stream); + } + } + + return Stream_.get(); + } + + TString GetResponse() override + { + auto result = GetResponseStream()->ReadAll(); + + TStringStream loggedAttributes; + loggedAttributes + << "Time: " << TInstant::Now() - Context_.StartTime << "; " + << "HostName: " << Context_.HostName << "; " + << Context_.LoggedAttributes; + + if (Context_.LogResponse) { + constexpr auto sizeLimit = 1 << 7; + YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)", + Context_.RequestId, + TruncateForLogs(result, sizeLimit), + loggedAttributes.Str()); + } else { + YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)", + Context_.RequestId, + result.size(), + loggedAttributes.Str()); + } + return result; + } + + TString GetRequestId() const override + { + return Context_.RequestId; + } + +private: + class TWrappedStream + : public IInputStream + { + public: + TWrappedStream(std::unique_ptr<IInputStream> underlying, NHttp::IResponsePtr response, TString requestId) + : Underlying_(std::move(underlying)) + , Response_(std::move(response)) + , RequestId_(std::move(requestId)) + { } + + protected: + size_t DoRead(void* buf, size_t len) override + { + size_t read = Underlying_->Read(buf, len); + + if (read == 0 && len != 0) { + CheckTrailers(Response_->GetTrailers()); + } + return read; + } + + size_t DoSkip(size_t len) override + { + size_t skipped = Underlying_->Skip(len); + if (skipped == 0 && len != 0) { + CheckTrailers(Response_->GetTrailers()); + } + return skipped; + } + + private: + void CheckTrailers(const NHttp::THeadersPtr& trailers) + { + if (auto errorResponse = ParseError(trailers)) { + errorResponse->SetIsFromTrailers(true); + YT_LOG_ERROR("RSP %v - %v", + RequestId_, + errorResponse.GetRef().what()); + ythrow errorResponse.GetRef(); + } + } + + TMaybe<TErrorResponse> ParseError(const NHttp::THeadersPtr& headers) + { + if (auto errorHeader = headers->Find("X-YT-Error")) { + TErrorResponse errorResponse(static_cast<int>(Response_->GetStatusCode()), RequestId_); + errorResponse.ParseFromJsonError(*errorHeader); + if (errorResponse.IsOk()) { + return Nothing(); + } + return errorResponse; + } + return Nothing(); + } + + private: + std::unique_ptr<IInputStream> Underlying_; + NHttp::IResponsePtr Response_; + TString RequestId_; + }; + +private: + TCoreRequestContext Context_; + NHttp::IResponsePtr Response_; + std::unique_ptr<IInputStream> Stream_; +}; + +class TCoreHttpRequest + : public IHttpRequest +{ +public: + TCoreHttpRequest(TCoreRequestContext context, NHttp::IActiveRequestPtr activeRequest) + : Context_(std::move(context)) + , ActiveRequest_(std::move(activeRequest)) + , Stream_(NConcurrency::CreateBufferedSyncAdapter(ActiveRequest_->GetRequestStream())) + , WrappedStream_(this, Stream_.get()) + { } + + IOutputStream* GetStream() override + { + return &WrappedStream_; + } + + IHttpResponsePtr Finish() override + { + WrappedStream_.Flush(); + auto response = ActiveRequest_->Finish().Get().ValueOrThrow(); + return std::make_unique<TCoreHttpResponse>(std::move(Context_), std::move(response)); + } + + IHttpResponsePtr FinishWithError() + { + auto response = ActiveRequest_->GetResponse(); + return std::make_unique<TCoreHttpResponse>(std::move(Context_), std::move(response)); + } + +private: + class TWrappedStream + : public IOutputStream + { + public: + TWrappedStream(TCoreHttpRequest* httpRequest, IOutputStream* underlying) + : HttpRequest_(httpRequest) + , Underlying_(underlying) + { } + + private: + void DoWrite(const void* buf, size_t len) override + { + WrapWriteFunc([&] { + Underlying_->Write(buf, len); + }); + } + + void DoWriteV(const TPart* parts, size_t count) override + { + WrapWriteFunc([&] { + Underlying_->Write(parts, count); + }); + } + + void DoWriteC(char ch) override + { + WrapWriteFunc([&] { + Underlying_->Write(ch); + }); + } + + void DoFlush() override + { + WrapWriteFunc([&] { + Underlying_->Flush(); + }); + } + + void DoFinish() override + { + WrapWriteFunc([&] { + Underlying_->Finish(); + }); + } + + void WrapWriteFunc(std::function<void()> func) + { + CheckErrorState(); + try { + func(); + } catch (const std::exception&) { + HandleWriteException(); + } + } + + // In many cases http proxy stops reading request and resets connection + // if error has happend. This function tries to read error response + // in such cases. + void HandleWriteException() { + Y_VERIFY(WriteError_ == nullptr); + WriteError_ = std::current_exception(); + Y_VERIFY(WriteError_ != nullptr); + try { + HttpRequest_->FinishWithError()->GetResponseStream(); + } catch (const TErrorResponse &) { + throw; + } catch (...) { + } + std::rethrow_exception(WriteError_); + } + + void CheckErrorState() + { + if (WriteError_) { + std::rethrow_exception(WriteError_); + } + } + + private: + TCoreHttpRequest* const HttpRequest_; + IOutputStream* Underlying_; + std::exception_ptr WriteError_; + }; + +private: + TCoreRequestContext Context_; + NHttp::IActiveRequestPtr ActiveRequest_; + std::unique_ptr<IOutputStream> Stream_; + TWrappedStream WrappedStream_; +}; + +class TCoreHttpClient + : public IHttpClient +{ +public: + TCoreHttpClient(bool useTLS, const TConfigPtr& config) + : Poller_(NConcurrency::CreateThreadPoolPoller(1, "http_poller")) // TODO(nadya73): YT-18363: move threads count to config + { + if (useTLS) { + auto httpsConfig = NYT::New<NYT::NHttps::TClientConfig>(); + httpsConfig->MaxIdleConnections = config->ConnectionPoolSize; + Client_ = NHttps::CreateClient(httpsConfig, Poller_); + } else { + auto httpConfig = NYT::New<NYT::NHttp::TClientConfig>(); + httpConfig->MaxIdleConnections = config->ConnectionPoolSize; + Client_ = NHttp::CreateClient(httpConfig, Poller_); + } + } + + IHttpResponsePtr Request(const TString& url, const TString& requestId, const THttpConfig& /*config*/, const THttpHeader& header, TMaybe<TStringBuf> body) override + { + TCoreRequestContext context = CreateContext(url, requestId, header); + + // TODO(nadya73): YT-18363: pass socket timeouts from THttpConfig + + NHttp::IResponsePtr response; + + auto logRequest = [&](bool includeParameters) { + LogRequest(header, url, includeParameters, requestId, context.HostName); + context.LoggedAttributes = GetLoggedAttributes(header, url, includeParameters, 128); + }; + + if (!body && (header.GetMethod() == "PUT" || header.GetMethod() == "POST")) { + const auto& parameters = header.GetParameters(); + auto parametersStr = NodeToYsonString(parameters); + + bool includeParameters = false; + auto headers = header.GetHeader(context.HostName, requestId, includeParameters).Get(); + + logRequest(includeParameters); + + auto activeRequest = StartRequestImpl(header.GetMethod(), url, headers); + + activeRequest->GetRequestStream()->Write(TSharedRef::FromString(parametersStr)).Get().ThrowOnError(); + response = activeRequest->Finish().Get().ValueOrThrow(); + } else { + auto bodyRef = TSharedRef::FromString(TString(body ? *body : "")); + bool includeParameters = true; + auto headers = header.GetHeader(context.HostName, requestId, includeParameters).Get(); + + logRequest(includeParameters); + + if (header.GetMethod() == "GET") { + response = RequestImpl(header.GetMethod(), url, headers, bodyRef); + } else { + auto activeRequest = StartRequestImpl(header.GetMethod(), url, headers); + + auto request = std::make_unique<TCoreHttpRequest>(std::move(context), std::move(activeRequest)); + if (body) { + request->GetStream()->Write(*body); + } + return request->Finish(); + } + } + + return std::make_unique<TCoreHttpResponse>(std::move(context), std::move(response)); + } + + IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpConfig& /*config*/, const THttpHeader& header) override + { + TCoreRequestContext context = CreateContext(url, requestId, header); + + LogRequest(header, url, true, requestId, context.HostName); + context.LoggedAttributes = GetLoggedAttributes(header, url, true, 128); + + auto headers = header.GetHeader(context.HostName, requestId, true).Get(); + auto activeRequest = StartRequestImpl(header.GetMethod(), url, headers); + + return std::make_unique<TCoreHttpRequest>(std::move(context), std::move(activeRequest)); + } + +private: + TCoreRequestContext CreateContext(const TString& url, const TString& requestId, const THttpHeader& header) + { + TCoreRequestContext context; + context.Url = url; + context.RequestId = requestId; + + auto urlRef = NHttp::ParseUrl(url); + context.HostName = CreateHost(urlRef.Host, urlRef.PortStr); + + context.LogResponse = false; + auto outputFormat = header.GetOutputFormat(); + if (outputFormat && outputFormat->IsTextYson()) { + context.LogResponse = true; + } + context.StartTime = TInstant::Now(); + return context; + } + + NHttp::IResponsePtr RequestImpl(const TString& method, const TString& url, const NHttp::THeadersPtr& headers, const TSharedRef& body) + { + if (method == "GET") { + return Client_->Get(url, headers).Get().ValueOrThrow(); + } else if (method == "POST") { + return Client_->Post(url, body, headers).Get().ValueOrThrow(); + } else if (method == "PUT") { + return Client_->Put(url, body, headers).Get().ValueOrThrow(); + } else { + YT_LOG_FATAL("Unsupported http method (Method: %v, Url: %v)", + method, + url); + } + } + + NHttp::IActiveRequestPtr StartRequestImpl(const TString& method, const TString& url, const NHttp::THeadersPtr& headers) + { + if (method == "POST") { + return Client_->StartPost(url, headers).Get().ValueOrThrow(); + } else if (method == "PUT") { + return Client_->StartPut(url, headers).Get().ValueOrThrow(); + } else { + YT_LOG_FATAL("Unsupported http method (Method: %v, Url: %v)", + method, + url); + } + } + + NConcurrency::IThreadPoolPollerPtr Poller_; + NHttp::IClientPtr Client_; +}; + +/////////////////////////////////////////////////////////////////////////////// + +IHttpClientPtr CreateDefaultHttpClient() +{ + return std::make_shared<TDefaultHttpClient>(); +} + +IHttpClientPtr CreateCoreHttpClient(bool useTLS, const TConfigPtr& config) +{ + return std::make_shared<TCoreHttpClient>(useTLS, config); +} + +/////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NHttpClient |