summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/http/http_client.cpp
diff options
context:
space:
mode:
authormax42 <[email protected]>2023-06-30 11:13:34 +0300
committermax42 <[email protected]>2023-06-30 11:13:34 +0300
commit3e1899838408bbad47622007aa382bc8a2b01f87 (patch)
tree0f21c1e6add187ddb6c3ccc048a7d640ce03fb87 /yt/cpp/mapreduce/http/http_client.cpp
parent5463eb3f5e72a86f858a3d27c886470a724ede34 (diff)
Revert "YT-19324: move YT provider to ydb/library/yql"
This reverts commit ca272f12fdd0e8d5c3e957fc87939148f1caaf72, reversing changes made to 49f8acfc8b0b5c0071b804423bcf53fda26c7c12.
Diffstat (limited to 'yt/cpp/mapreduce/http/http_client.cpp')
-rw-r--r--yt/cpp/mapreduce/http/http_client.cpp603
1 files changed, 0 insertions, 603 deletions
diff --git a/yt/cpp/mapreduce/http/http_client.cpp b/yt/cpp/mapreduce/http/http_client.cpp
deleted file mode 100644
index a2af1182dcf..00000000000
--- a/yt/cpp/mapreduce/http/http_client.cpp
+++ /dev/null
@@ -1,603 +0,0 @@
-#include "http_client.h"
-
-#include "abortable_http_response.h"
-#include "core.h"
-#include "helpers.h"
-#include "http.h"
-
-#include <yt/cpp/mapreduce/interface/config.h>
-
-#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
-
-#include <yt/yt/core/concurrency/thread_pool_poller.h>
-
-#include <yt/yt/core/http/client.h>
-#include <yt/yt/core/http/config.h>
-#include <yt/yt/core/http/http.h>
-
-#include <yt/yt/core/https/client.h>
-#include <yt/yt/core/https/config.h>
-
-#include <library/cpp/yson/node/node_io.h>
-
-namespace NYT::NHttpClient {
-
-namespace {
-
-TString CreateHost(TStringBuf host, TStringBuf port)
-{
- if (!port.empty()) {
- return Format("%v:%v", host, port);
- }
-
- return TString(host);
-}
-
-TMaybe<TErrorResponse> GetErrorResponse(const TString& hostName, const TString& requestId, const NHttp::IResponsePtr& response)
-{
- auto httpCode = response->GetStatusCode();
- if (httpCode == NHttp::EStatusCode::OK || httpCode == NHttp::EStatusCode::Accepted) {
- return {};
- }
-
- TErrorResponse errorResponse(static_cast<int>(httpCode), requestId);
-
- auto logAndSetError = [&] (const TString& rawError) {
- YT_LOG_ERROR("RSP %v - HTTP %v - %v",
- requestId,
- httpCode,
- rawError.data());
- errorResponse.SetRawError(rawError);
- };
-
- switch (httpCode) {
- case NHttp::EStatusCode::TooManyRequests:
- logAndSetError("request rate limit exceeded");
- break;
-
- case NHttp::EStatusCode::InternalServerError:
- logAndSetError("internal error in proxy " + hostName);
- break;
-
- default: {
- TStringStream httpHeaders;
- httpHeaders << "HTTP headers (";
- for (const auto& [headerName, headerValue] : response->GetHeaders()->Dump()) {
- httpHeaders << headerName << ": " << headerValue << "; ";
- }
- httpHeaders << ")";
-
- auto errorString = Sprintf("RSP %s - HTTP %d - %s",
- requestId.data(),
- static_cast<int>(httpCode),
- httpHeaders.Str().data());
-
- YT_LOG_ERROR("%v",
- errorString.data());
-
- if (auto errorHeader = response->GetHeaders()->Find("X-YT-Error")) {
- errorResponse.ParseFromJsonError(*errorHeader);
- if (errorResponse.IsOk()) {
- return Nothing();
- }
- return errorResponse;
- }
-
- errorResponse.SetRawError(
- errorString + " - X-YT-Error is missing in headers");
- break;
- }
- }
-
- return errorResponse;
-}
-
-void CheckErrorResponse(const TString& hostName, const TString& requestId, const NHttp::IResponsePtr& response)
-{
- auto errorResponse = GetErrorResponse(hostName, requestId, response);
- if (errorResponse) {
- throw *errorResponse;
- }
-}
-
-} // namespace
-
-///////////////////////////////////////////////////////////////////////////////
-
-class TDefaultHttpResponse
- : public IHttpResponse
-{
-public:
- TDefaultHttpResponse(std::unique_ptr<THttpRequest> request)
- : Request_(std::move(request))
- { }
-
- int GetStatusCode() override
- {
- return Request_->GetHttpCode();
- }
-
- IInputStream* GetResponseStream() override
- {
- return Request_->GetResponseStream();
- }
-
- TString GetResponse() override
- {
- return Request_->GetResponse();
- }
-
- TString GetRequestId() const override
- {
- return Request_->GetRequestId();
- }
-
-private:
- std::unique_ptr<THttpRequest> Request_;
-};
-
-class TDefaultHttpRequest
- : public IHttpRequest
-{
-public:
- TDefaultHttpRequest(std::unique_ptr<THttpRequest> request, IOutputStream* stream)
- : Request_(std::move(request))
- , Stream_(stream)
- { }
-
- IOutputStream* GetStream() override
- {
- return Stream_;
- }
-
- IHttpResponsePtr Finish() override
- {
- Request_->FinishRequest();
- return std::make_unique<TDefaultHttpResponse>(std::move(Request_));
- }
-
-private:
- std::unique_ptr<THttpRequest> Request_;
- IOutputStream* Stream_;
-};
-
-class TDefaultHttpClient
- : public IHttpClient
-{
-public:
- IHttpResponsePtr Request(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header, TMaybe<TStringBuf> body) override
- {
- auto request = std::make_unique<THttpRequest>(requestId);
-
- auto urlRef = NHttp::ParseUrl(url);
-
- request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout);
- request->SmallRequest(header, body);
- return std::make_unique<TDefaultHttpResponse>(std::move(request));
- }
-
- IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header) override
- {
- auto request = std::make_unique<THttpRequest>(requestId);
-
- auto urlRef = NHttp::ParseUrl(url);
-
- request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout);
- auto stream = request->StartRequest(header);
- return std::make_unique<TDefaultHttpRequest>(std::move(request), stream);
- }
-};
-
-///////////////////////////////////////////////////////////////////////////////
-
-struct TCoreRequestContext
-{
- TString HostName;
- TString Url;
- TString RequestId;
- bool LogResponse;
- TInstant StartTime;
- TString LoggedAttributes;
-};
-
-class TCoreHttpResponse
- : public IHttpResponse
-{
-public:
- TCoreHttpResponse(
- TCoreRequestContext context,
- NHttp::IResponsePtr response)
- : Context_(std::move(context))
- , Response_(std::move(response))
- { }
-
- int GetStatusCode() override
- {
- return static_cast<int>(Response_->GetStatusCode());
- }
-
- IInputStream* GetResponseStream() override
- {
- if (!Stream_) {
- auto stream = std::make_unique<TWrappedStream>(
- NConcurrency::CreateSyncAdapter(NConcurrency::CreateCopyingAdapter(Response_), NConcurrency::EWaitForStrategy::WaitFor),
- Response_,
- Context_.RequestId);
- CheckErrorResponse(Context_.HostName, Context_.RequestId, Response_);
-
- if (TConfig::Get()->UseAbortableResponse) {
- Y_VERIFY(!Context_.Url.empty());
- Stream_ = std::make_unique<TAbortableCoreHttpResponse>(std::move(stream), Context_.Url);
- } else {
- Stream_ = std::move(stream);
- }
- }
-
- return Stream_.get();
- }
-
- TString GetResponse() override
- {
- auto result = GetResponseStream()->ReadAll();
-
- TStringStream loggedAttributes;
- loggedAttributes
- << "Time: " << TInstant::Now() - Context_.StartTime << "; "
- << "HostName: " << Context_.HostName << "; "
- << Context_.LoggedAttributes;
-
- if (Context_.LogResponse) {
- constexpr auto sizeLimit = 1 << 7;
- YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)",
- Context_.RequestId,
- TruncateForLogs(result, sizeLimit),
- loggedAttributes.Str());
- } else {
- YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)",
- Context_.RequestId,
- result.size(),
- loggedAttributes.Str());
- }
- return result;
- }
-
- TString GetRequestId() const override
- {
- return Context_.RequestId;
- }
-
-private:
- class TWrappedStream
- : public IInputStream
- {
- public:
- TWrappedStream(std::unique_ptr<IInputStream> underlying, NHttp::IResponsePtr response, TString requestId)
- : Underlying_(std::move(underlying))
- , Response_(std::move(response))
- , RequestId_(std::move(requestId))
- { }
-
- protected:
- size_t DoRead(void* buf, size_t len) override
- {
- size_t read = Underlying_->Read(buf, len);
-
- if (read == 0 && len != 0) {
- CheckTrailers(Response_->GetTrailers());
- }
- return read;
- }
-
- size_t DoSkip(size_t len) override
- {
- size_t skipped = Underlying_->Skip(len);
- if (skipped == 0 && len != 0) {
- CheckTrailers(Response_->GetTrailers());
- }
- return skipped;
- }
-
- private:
- void CheckTrailers(const NHttp::THeadersPtr& trailers)
- {
- if (auto errorResponse = ParseError(trailers)) {
- errorResponse->SetIsFromTrailers(true);
- YT_LOG_ERROR("RSP %v - %v",
- RequestId_,
- errorResponse.GetRef().what());
- ythrow errorResponse.GetRef();
- }
- }
-
- TMaybe<TErrorResponse> ParseError(const NHttp::THeadersPtr& headers)
- {
- if (auto errorHeader = headers->Find("X-YT-Error")) {
- TErrorResponse errorResponse(static_cast<int>(Response_->GetStatusCode()), RequestId_);
- errorResponse.ParseFromJsonError(*errorHeader);
- if (errorResponse.IsOk()) {
- return Nothing();
- }
- return errorResponse;
- }
- return Nothing();
- }
-
- private:
- std::unique_ptr<IInputStream> Underlying_;
- NHttp::IResponsePtr Response_;
- TString RequestId_;
- };
-
-private:
- TCoreRequestContext Context_;
- NHttp::IResponsePtr Response_;
- std::unique_ptr<IInputStream> Stream_;
-};
-
-class TCoreHttpRequest
- : public IHttpRequest
-{
-public:
- TCoreHttpRequest(TCoreRequestContext context, NHttp::IActiveRequestPtr activeRequest)
- : Context_(std::move(context))
- , ActiveRequest_(std::move(activeRequest))
- , Stream_(NConcurrency::CreateBufferedSyncAdapter(ActiveRequest_->GetRequestStream()))
- , WrappedStream_(this, Stream_.get())
- { }
-
- IOutputStream* GetStream() override
- {
- return &WrappedStream_;
- }
-
- IHttpResponsePtr Finish() override
- {
- WrappedStream_.Flush();
- auto response = ActiveRequest_->Finish().Get().ValueOrThrow();
- return std::make_unique<TCoreHttpResponse>(std::move(Context_), std::move(response));
- }
-
- IHttpResponsePtr FinishWithError()
- {
- auto response = ActiveRequest_->GetResponse();
- return std::make_unique<TCoreHttpResponse>(std::move(Context_), std::move(response));
- }
-
-private:
- class TWrappedStream
- : public IOutputStream
- {
- public:
- TWrappedStream(TCoreHttpRequest* httpRequest, IOutputStream* underlying)
- : HttpRequest_(httpRequest)
- , Underlying_(underlying)
- { }
-
- private:
- void DoWrite(const void* buf, size_t len) override
- {
- WrapWriteFunc([&] {
- Underlying_->Write(buf, len);
- });
- }
-
- void DoWriteV(const TPart* parts, size_t count) override
- {
- WrapWriteFunc([&] {
- Underlying_->Write(parts, count);
- });
- }
-
- void DoWriteC(char ch) override
- {
- WrapWriteFunc([&] {
- Underlying_->Write(ch);
- });
- }
-
- void DoFlush() override
- {
- WrapWriteFunc([&] {
- Underlying_->Flush();
- });
- }
-
- void DoFinish() override
- {
- WrapWriteFunc([&] {
- Underlying_->Finish();
- });
- }
-
- void WrapWriteFunc(std::function<void()> func)
- {
- CheckErrorState();
- try {
- func();
- } catch (const std::exception&) {
- HandleWriteException();
- }
- }
-
- // In many cases http proxy stops reading request and resets connection
- // if error has happend. This function tries to read error response
- // in such cases.
- void HandleWriteException() {
- Y_VERIFY(WriteError_ == nullptr);
- WriteError_ = std::current_exception();
- Y_VERIFY(WriteError_ != nullptr);
- try {
- HttpRequest_->FinishWithError()->GetResponseStream();
- } catch (const TErrorResponse &) {
- throw;
- } catch (...) {
- }
- std::rethrow_exception(WriteError_);
- }
-
- void CheckErrorState()
- {
- if (WriteError_) {
- std::rethrow_exception(WriteError_);
- }
- }
-
- private:
- TCoreHttpRequest* const HttpRequest_;
- IOutputStream* Underlying_;
- std::exception_ptr WriteError_;
- };
-
-private:
- TCoreRequestContext Context_;
- NHttp::IActiveRequestPtr ActiveRequest_;
- std::unique_ptr<IOutputStream> Stream_;
- TWrappedStream WrappedStream_;
-};
-
-class TCoreHttpClient
- : public IHttpClient
-{
-public:
- TCoreHttpClient(bool useTLS, const TConfigPtr& config)
- : Poller_(NConcurrency::CreateThreadPoolPoller(1, "http_poller")) // TODO(nadya73): YT-18363: move threads count to config
- {
- if (useTLS) {
- auto httpsConfig = NYT::New<NYT::NHttps::TClientConfig>();
- httpsConfig->MaxIdleConnections = config->ConnectionPoolSize;
- Client_ = NHttps::CreateClient(httpsConfig, Poller_);
- } else {
- auto httpConfig = NYT::New<NYT::NHttp::TClientConfig>();
- httpConfig->MaxIdleConnections = config->ConnectionPoolSize;
- Client_ = NHttp::CreateClient(httpConfig, Poller_);
- }
- }
-
- IHttpResponsePtr Request(const TString& url, const TString& requestId, const THttpConfig& /*config*/, const THttpHeader& header, TMaybe<TStringBuf> body) override
- {
- TCoreRequestContext context = CreateContext(url, requestId, header);
-
- // TODO(nadya73): YT-18363: pass socket timeouts from THttpConfig
-
- NHttp::IResponsePtr response;
-
- auto logRequest = [&](bool includeParameters) {
- LogRequest(header, url, includeParameters, requestId, context.HostName);
- context.LoggedAttributes = GetLoggedAttributes(header, url, includeParameters, 128);
- };
-
- if (!body && (header.GetMethod() == "PUT" || header.GetMethod() == "POST")) {
- const auto& parameters = header.GetParameters();
- auto parametersStr = NodeToYsonString(parameters);
-
- bool includeParameters = false;
- auto headers = header.GetHeader(context.HostName, requestId, includeParameters).Get();
-
- logRequest(includeParameters);
-
- auto activeRequest = StartRequestImpl(header.GetMethod(), url, headers);
-
- activeRequest->GetRequestStream()->Write(TSharedRef::FromString(parametersStr)).Get().ThrowOnError();
- response = activeRequest->Finish().Get().ValueOrThrow();
- } else {
- auto bodyRef = TSharedRef::FromString(TString(body ? *body : ""));
- bool includeParameters = true;
- auto headers = header.GetHeader(context.HostName, requestId, includeParameters).Get();
-
- logRequest(includeParameters);
-
- if (header.GetMethod() == "GET") {
- response = RequestImpl(header.GetMethod(), url, headers, bodyRef);
- } else {
- auto activeRequest = StartRequestImpl(header.GetMethod(), url, headers);
-
- auto request = std::make_unique<TCoreHttpRequest>(std::move(context), std::move(activeRequest));
- if (body) {
- request->GetStream()->Write(*body);
- }
- return request->Finish();
- }
- }
-
- return std::make_unique<TCoreHttpResponse>(std::move(context), std::move(response));
- }
-
- IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpConfig& /*config*/, const THttpHeader& header) override
- {
- TCoreRequestContext context = CreateContext(url, requestId, header);
-
- LogRequest(header, url, true, requestId, context.HostName);
- context.LoggedAttributes = GetLoggedAttributes(header, url, true, 128);
-
- auto headers = header.GetHeader(context.HostName, requestId, true).Get();
- auto activeRequest = StartRequestImpl(header.GetMethod(), url, headers);
-
- return std::make_unique<TCoreHttpRequest>(std::move(context), std::move(activeRequest));
- }
-
-private:
- TCoreRequestContext CreateContext(const TString& url, const TString& requestId, const THttpHeader& header)
- {
- TCoreRequestContext context;
- context.Url = url;
- context.RequestId = requestId;
-
- auto urlRef = NHttp::ParseUrl(url);
- context.HostName = CreateHost(urlRef.Host, urlRef.PortStr);
-
- context.LogResponse = false;
- auto outputFormat = header.GetOutputFormat();
- if (outputFormat && outputFormat->IsTextYson()) {
- context.LogResponse = true;
- }
- context.StartTime = TInstant::Now();
- return context;
- }
-
- NHttp::IResponsePtr RequestImpl(const TString& method, const TString& url, const NHttp::THeadersPtr& headers, const TSharedRef& body)
- {
- if (method == "GET") {
- return Client_->Get(url, headers).Get().ValueOrThrow();
- } else if (method == "POST") {
- return Client_->Post(url, body, headers).Get().ValueOrThrow();
- } else if (method == "PUT") {
- return Client_->Put(url, body, headers).Get().ValueOrThrow();
- } else {
- YT_LOG_FATAL("Unsupported http method (Method: %v, Url: %v)",
- method,
- url);
- }
- }
-
- NHttp::IActiveRequestPtr StartRequestImpl(const TString& method, const TString& url, const NHttp::THeadersPtr& headers)
- {
- if (method == "POST") {
- return Client_->StartPost(url, headers).Get().ValueOrThrow();
- } else if (method == "PUT") {
- return Client_->StartPut(url, headers).Get().ValueOrThrow();
- } else {
- YT_LOG_FATAL("Unsupported http method (Method: %v, Url: %v)",
- method,
- url);
- }
- }
-
- NConcurrency::IThreadPoolPollerPtr Poller_;
- NHttp::IClientPtr Client_;
-};
-
-///////////////////////////////////////////////////////////////////////////////
-
-IHttpClientPtr CreateDefaultHttpClient()
-{
- return std::make_shared<TDefaultHttpClient>();
-}
-
-IHttpClientPtr CreateCoreHttpClient(bool useTLS, const TConfigPtr& config)
-{
- return std::make_shared<TCoreHttpClient>(useTLS, config);
-}
-
-///////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NHttpClient