diff options
author | max42 <max42@yandex-team.com> | 2023-07-29 00:02:16 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-07-29 00:02:16 +0300 |
commit | 73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch) | |
tree | 188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/http | |
parent | 528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff) | |
download | ydb-73b89de71748a21e102d27b9f3ed1bf658766cb5.tar.gz |
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build.
Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/http')
24 files changed, 3202 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/http/CMakeLists.darwin-x86_64.txt b/yt/cpp/mapreduce/http/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..8212c9d8d16 --- /dev/null +++ b/yt/cpp/mapreduce/http/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,37 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-mapreduce-http) +target_compile_options(cpp-mapreduce-http PRIVATE + -Wdeprecated-this-capture +) +target_link_libraries(cpp-mapreduce-http PUBLIC + contrib-libs-cxxsupp + yutil + cpp-deprecated-atomic + cpp-http-io + cpp-string_utils-base64 + cpp-string_utils-quote + cpp-threading-cron + cpp-mapreduce-common + cpp-mapreduce-interface + mapreduce-interface-logging + yt-core-http + yt-core-https +) +target_sources(cpp-mapreduce-http PRIVATE + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/abortable_http_response.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/context.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/helpers.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/host_manager.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/http.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/http_client.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/requests.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/retry_request.cpp +) diff --git a/yt/cpp/mapreduce/http/CMakeLists.linux-aarch64.txt b/yt/cpp/mapreduce/http/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..b1993829102 --- /dev/null +++ b/yt/cpp/mapreduce/http/CMakeLists.linux-aarch64.txt @@ -0,0 +1,38 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-mapreduce-http) +target_compile_options(cpp-mapreduce-http PRIVATE + -Wdeprecated-this-capture +) +target_link_libraries(cpp-mapreduce-http PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-deprecated-atomic + cpp-http-io + cpp-string_utils-base64 + cpp-string_utils-quote + cpp-threading-cron + cpp-mapreduce-common + cpp-mapreduce-interface + mapreduce-interface-logging + yt-core-http + yt-core-https +) +target_sources(cpp-mapreduce-http PRIVATE + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/abortable_http_response.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/context.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/helpers.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/host_manager.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/http.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/http_client.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/requests.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/retry_request.cpp +) diff --git a/yt/cpp/mapreduce/http/CMakeLists.linux-x86_64.txt b/yt/cpp/mapreduce/http/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..b1993829102 --- /dev/null +++ b/yt/cpp/mapreduce/http/CMakeLists.linux-x86_64.txt @@ -0,0 +1,38 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-mapreduce-http) +target_compile_options(cpp-mapreduce-http PRIVATE + -Wdeprecated-this-capture +) +target_link_libraries(cpp-mapreduce-http PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-deprecated-atomic + cpp-http-io + cpp-string_utils-base64 + cpp-string_utils-quote + cpp-threading-cron + cpp-mapreduce-common + cpp-mapreduce-interface + mapreduce-interface-logging + yt-core-http + yt-core-https +) +target_sources(cpp-mapreduce-http PRIVATE + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/abortable_http_response.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/context.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/helpers.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/host_manager.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/http.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/http_client.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/requests.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/retry_request.cpp +) diff --git a/yt/cpp/mapreduce/http/CMakeLists.txt b/yt/cpp/mapreduce/http/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/yt/cpp/mapreduce/http/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/yt/cpp/mapreduce/http/CMakeLists.windows-x86_64.txt b/yt/cpp/mapreduce/http/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..b742e11f63c --- /dev/null +++ b/yt/cpp/mapreduce/http/CMakeLists.windows-x86_64.txt @@ -0,0 +1,34 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-mapreduce-http) +target_link_libraries(cpp-mapreduce-http PUBLIC + contrib-libs-cxxsupp + yutil + cpp-deprecated-atomic + cpp-http-io + cpp-string_utils-base64 + cpp-string_utils-quote + cpp-threading-cron + cpp-mapreduce-common + cpp-mapreduce-interface + mapreduce-interface-logging + yt-core-http + yt-core-https +) +target_sources(cpp-mapreduce-http PRIVATE + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/abortable_http_response.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/context.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/helpers.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/host_manager.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/http.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/http_client.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/requests.cpp + ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/retry_request.cpp +) diff --git a/yt/cpp/mapreduce/http/abortable_http_response.cpp b/yt/cpp/mapreduce/http/abortable_http_response.cpp new file mode 100644 index 00000000000..9da9241d337 --- /dev/null +++ b/yt/cpp/mapreduce/http/abortable_http_response.cpp @@ -0,0 +1,223 @@ +#include "abortable_http_response.h" + +#include <util/system/mutex.h> +#include <util/generic/singleton.h> +#include <util/generic/hash_set.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class TAbortableHttpResponseRegistry { +public: + TOutageId StartOutage(TString urlPattern, const TOutageOptions& options) + { + auto g = Guard(Lock_); + auto id = NextId_++; + IdToOutage.emplace(id, TOutageEntry{std::move(urlPattern), options.ResponseCount_, options.LengthLimit_}); + return id; + } + + void StopOutage(TOutageId id) + { + auto g = Guard(Lock_); + IdToOutage.erase(id); + } + + void Add(IAbortableHttpResponse* response) + { + auto g = Guard(Lock_); + for (auto& [id, entry] : IdToOutage) { + if (entry.Counter > 0 && response->GetUrl().find(entry.Pattern) != TString::npos) { + response->SetLengthLimit(entry.LengthLimit); + entry.Counter -= 1; + } + } + ResponseList_.PushBack(response); + } + + void Remove(IAbortableHttpResponse* response) + { + auto g = Guard(Lock_); + response->Unlink(); + } + + static TAbortableHttpResponseRegistry& Get() + { + return *Singleton<TAbortableHttpResponseRegistry>(); + } + + int AbortAll(const TString& urlPattern) + { + int result = 0; + for (auto& response : ResponseList_) { + if (!response.IsAborted() && response.GetUrl().find(urlPattern) != TString::npos) { + response.Abort(); + ++result; + } + } + return result; + } + +private: + struct TOutageEntry + { + TString Pattern; + size_t Counter; + size_t LengthLimit; + }; + +private: + TOutageId NextId_ = 0; + TIntrusiveList<IAbortableHttpResponse> ResponseList_; + THashMap<TOutageId, TOutageEntry> IdToOutage; + TMutex Lock_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +TAbortableHttpResponse::TOutage::TOutage( + TString urlPattern, + TAbortableHttpResponseRegistry& registry, + const TOutageOptions& options) + : UrlPattern_(std::move(urlPattern)) + , Registry_(registry) + , Id_(registry.StartOutage(UrlPattern_, options)) +{ } + +TAbortableHttpResponse::TOutage::~TOutage() +{ + Stop(); +} + +void TAbortableHttpResponse::TOutage::Stop() +{ + if (!Stopped_) { + Registry_.StopOutage(Id_); + Stopped_ = true; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +TAbortableHttpResponseBase::TAbortableHttpResponseBase(const TString& url) + : Url_(url) +{ + TAbortableHttpResponseRegistry::Get().Add(this); +} + +TAbortableHttpResponseBase::~TAbortableHttpResponseBase() +{ + TAbortableHttpResponseRegistry::Get().Remove(this); +} + +void TAbortableHttpResponseBase::Abort() +{ + Aborted_ = true; +} + +void TAbortableHttpResponseBase::SetLengthLimit(size_t limit) +{ + LengthLimit_ = limit; + if (LengthLimit_ == 0) { + Abort(); + } +} + +const TString& TAbortableHttpResponseBase::GetUrl() const +{ + return Url_; +} + +bool TAbortableHttpResponseBase::IsAborted() const +{ + return Aborted_; +} + +//////////////////////////////////////////////////////////////////////////////// + +TAbortableHttpResponse::TAbortableHttpResponse( + IInputStream* socketStream, + const TString& requestId, + const TString& hostName, + const TString& url) + : THttpResponse(socketStream, requestId, hostName) + , TAbortableHttpResponseBase(url) +{ +} + +size_t TAbortableHttpResponse::DoRead(void* buf, size_t len) +{ + if (Aborted_) { + ythrow TAbortedForTestPurpose() << "response was aborted"; + } + len = std::min(len, LengthLimit_); + auto read = THttpResponse::DoRead(buf, len); + LengthLimit_ -= read; + if (LengthLimit_ == 0) { + Abort(); + } + return read; +} + +size_t TAbortableHttpResponse::DoSkip(size_t len) +{ + if (Aborted_) { + ythrow TAbortedForTestPurpose() << "response was aborted"; + } + return THttpResponse::DoSkip(len); +} + +int TAbortableHttpResponse::AbortAll(const TString& urlPattern) +{ + return TAbortableHttpResponseRegistry::Get().AbortAll(urlPattern); +} + +TAbortableHttpResponse::TOutage TAbortableHttpResponse::StartOutage( + const TString& urlPattern, + const TOutageOptions& options) +{ + return TOutage(urlPattern, TAbortableHttpResponseRegistry::Get(), options); +} + +TAbortableHttpResponse::TOutage TAbortableHttpResponse::StartOutage( + const TString& urlPattern, + size_t responseCount) +{ + return StartOutage(urlPattern, TOutageOptions().ResponseCount(responseCount)); +} + +TAbortableCoreHttpResponse::TAbortableCoreHttpResponse( + std::unique_ptr<IInputStream> stream, + const TString& url) + : TAbortableHttpResponseBase(url) + , Stream_(std::move(stream)) +{ +} + +size_t TAbortableCoreHttpResponse::DoRead(void* buf, size_t len) +{ + if (Aborted_) { + ythrow TAbortedForTestPurpose() << "response was aborted"; + } + len = std::min(len, LengthLimit_); + auto read = Stream_->Read(buf, len); + LengthLimit_ -= read; + if (LengthLimit_ == 0) { + Abort(); + } + + return read; +} + +size_t TAbortableCoreHttpResponse::DoSkip(size_t len) +{ + if (Aborted_) { + ythrow TAbortedForTestPurpose() << "response was aborted"; + } + return Stream_->Skip(len); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/http/abortable_http_response.h b/yt/cpp/mapreduce/http/abortable_http_response.h new file mode 100644 index 00000000000..d72bcfa0a69 --- /dev/null +++ b/yt/cpp/mapreduce/http/abortable_http_response.h @@ -0,0 +1,142 @@ +#pragma once + +#include "http.h" + +#include <util/generic/intrlist.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class TAbortableHttpResponseRegistry; + +using TOutageId = size_t; + +//////////////////////////////////////////////////////////////////////////////// + +class TAbortedForTestPurpose + : public yexception +{ }; + +struct TOutageOptions +{ + using TSelf = TOutageOptions; + + /// @brief Number of responses to abort. + FLUENT_FIELD_DEFAULT(size_t, ResponseCount, std::numeric_limits<size_t>::max()); + + /// @brief Number of bytes to read before abortion. If zero, abort immediately. + FLUENT_FIELD_DEFAULT(size_t, LengthLimit, 0); +}; + +//////////////////////////////////////////////////////////////////////////////// + +class IAbortableHttpResponse + : public TIntrusiveListItem<IAbortableHttpResponse> +{ +public: + virtual void Abort() = 0; + virtual const TString& GetUrl() const = 0; + virtual bool IsAborted() const = 0; + virtual void SetLengthLimit(size_t limit) = 0; + + virtual ~IAbortableHttpResponse() = default; +}; + +class TAbortableHttpResponseBase + : public IAbortableHttpResponse +{ +public: + TAbortableHttpResponseBase(const TString& url); + ~TAbortableHttpResponseBase(); + + void Abort() override; + const TString& GetUrl() const override; + bool IsAborted() const override; + void SetLengthLimit(size_t limit) override; + +protected: + TString Url_; + std::atomic<bool> Aborted_ = {false}; + size_t LengthLimit_ = std::numeric_limits<size_t>::max(); +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// @brief Stream wrapper for @ref NYT::NHttpClient::TCoreHttpResponse with possibility to emulate errors. +class TAbortableCoreHttpResponse + : public IInputStream + , public TAbortableHttpResponseBase +{ +public: + TAbortableCoreHttpResponse( + std::unique_ptr<IInputStream> stream, + const TString& url); + +private: + size_t DoRead(void* buf, size_t len) override; + size_t DoSkip(size_t len) override; + +private: + std::unique_ptr<IInputStream> Stream_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// @brief Class extends @ref NYT::THttpResponse with possibility to emulate errors. +class TAbortableHttpResponse + : public THttpResponse + , public TAbortableHttpResponseBase +{ +public: + class TOutage + { + public: + TOutage(TString urlPattern, TAbortableHttpResponseRegistry& registry, const TOutageOptions& options); + TOutage(TOutage&&) = default; + TOutage(const TOutage&) = delete; + ~TOutage(); + + void Stop(); + + private: + TString UrlPattern_; + TAbortableHttpResponseRegistry& Registry_; + TOutageId Id_; + bool Stopped_ = false; + }; + +public: + TAbortableHttpResponse( + IInputStream* socketStream, + const TString& requestId, + const TString& hostName, + const TString& url); + + /// @brief Abort any responses which match `urlPattern` (i.e. contain it in url). + /// + /// @return number of aborted responses. + static int AbortAll(const TString& urlPattern); + + /// @brief Start outage. Future responses which match `urlPattern` (i.e. contain it in url) will fail. + /// + /// @return outage object controlling the lifetime of outage (outage stops when object is destroyed) + [[nodiscard]] static TOutage StartOutage( + const TString& urlPattern, + const TOutageOptions& options = TOutageOptions()); + + /// @brief Start outage. Future `responseCount` responses which match `urlPattern` (i.e. contain it in url) will fail. + /// + /// @return outage object controlling the lifetime of outage (outage stops when object is destroyed) + [[nodiscard]] static TOutage StartOutage( + const TString& urlPattern, + size_t responseCount); + +private: + size_t DoRead(void* buf, size_t len) override; + size_t DoSkip(size_t len) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/http/context.cpp b/yt/cpp/mapreduce/http/context.cpp new file mode 100644 index 00000000000..1c016263c51 --- /dev/null +++ b/yt/cpp/mapreduce/http/context.cpp @@ -0,0 +1,25 @@ +#include "context.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +bool operator==(const TClientContext& lhs, const TClientContext& rhs) +{ + return lhs.ServerName == rhs.ServerName && + lhs.Token == rhs.Token && + lhs.ImpersonationUser == rhs.ImpersonationUser && + lhs.ServiceTicketAuth == rhs.ServiceTicketAuth && + lhs.HttpClient == rhs.HttpClient && + lhs.UseTLS == rhs.UseTLS && + lhs.TvmOnly == rhs.TvmOnly; +} + +bool operator!=(const TClientContext& lhs, const TClientContext& rhs) +{ + return !(rhs == lhs); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/http/context.h b/yt/cpp/mapreduce/http/context.h new file mode 100644 index 00000000000..3926373e174 --- /dev/null +++ b/yt/cpp/mapreduce/http/context.h @@ -0,0 +1,31 @@ +#pragma once + +#include "fwd.h" + +#include <yt/cpp/mapreduce/interface/common.h> +#include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/public.h> + + +namespace NYT { + +/////////////////////////////////////////////////////////////////////////////// + +struct TClientContext +{ + TString ServerName; + TString Token; + TMaybe<TString> ImpersonationUser; + NAuth::IServiceTicketAuthPtrWrapperPtr ServiceTicketAuth; + NHttpClient::IHttpClientPtr HttpClient; + bool TvmOnly = false; + bool UseTLS = false; + TConfigPtr Config = TConfig::Get(); +}; + +bool operator==(const TClientContext& lhs, const TClientContext& rhs); +bool operator!=(const TClientContext& lhs, const TClientContext& rhs); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/http/core.h b/yt/cpp/mapreduce/http/core.h new file mode 100644 index 00000000000..37c74d75515 --- /dev/null +++ b/yt/cpp/mapreduce/http/core.h @@ -0,0 +1,27 @@ +#pragma once + +#include <yt/yt/core/http/public.h> + +#include <memory> + +namespace NYT::NHttp { + +//////////////////////////////////////////////////////////////////////////////// + +/// @brief Wrapper for THeaderPtr which allows to hide NYT::IntrusivePtr from interfaces. +struct THeadersPtrWrapper +{ + THeadersPtrWrapper(THeadersPtr ptr) + : Ptr(std::make_shared<THeadersPtr>(std::move(ptr))) + { } + + THeadersPtr Get() { + return *Ptr; + } + + std::shared_ptr<THeadersPtr> Ptr; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NHttp diff --git a/yt/cpp/mapreduce/http/fwd.h b/yt/cpp/mapreduce/http/fwd.h new file mode 100644 index 00000000000..62891731f6c --- /dev/null +++ b/yt/cpp/mapreduce/http/fwd.h @@ -0,0 +1,26 @@ +#pragma once + +#include <memory> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +struct TClientContext; +class THttpHeader; + +namespace NHttpClient { + +class IHttpClient; +class IHttpRequest; +class IHttpResponse; + +using IHttpClientPtr = std::shared_ptr<IHttpClient>; +using IHttpResponsePtr = std::unique_ptr<IHttpResponse>; +using IHttpRequestPtr = std::unique_ptr<IHttpRequest>; + +} // namespace NHttpClient + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/http/helpers.cpp b/yt/cpp/mapreduce/http/helpers.cpp new file mode 100644 index 00000000000..233a565f205 --- /dev/null +++ b/yt/cpp/mapreduce/http/helpers.cpp @@ -0,0 +1,88 @@ +#include "helpers.h" + +#include "context.h" +#include "requests.h" + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <library/cpp/yson/node/node_io.h> + +namespace NYT { + +/////////////////////////////////////////////////////////////////////////////// + +TString CreateHostNameWithPort(const TString& hostName, const TClientContext& context) +{ + static constexpr int HttpProxyPort = 80; + static constexpr int HttpsProxyPort = 443; + + static constexpr int TvmOnlyHttpProxyPort = 9026; + static constexpr int TvmOnlyHttpsProxyPort = 9443; + + if (hostName.find(':') == TString::npos) { + int port; + if (context.TvmOnly) { + port = context.UseTLS + ? TvmOnlyHttpsProxyPort + : TvmOnlyHttpProxyPort; + } else { + port = context.UseTLS + ? HttpsProxyPort + : HttpProxyPort; + } + return Format("%v:%v", hostName, port); + } + return hostName; +} + +TString GetFullUrl(const TString& hostName, const TClientContext& context, THttpHeader& header) +{ + Y_UNUSED(context); + return Format("http://%v%v", hostName, header.GetUrl()); +} + +static TString GetParametersDebugString(const THttpHeader& header) +{ + const auto& parameters = header.GetParameters(); + if (parameters.Empty()) { + return "<empty>"; + } else { + return NodeToYsonString(parameters); + } +} + +TString TruncateForLogs(const TString& text, size_t maxSize) +{ + Y_VERIFY(maxSize > 10); + if (text.empty()) { + static TString empty = "empty"; + return empty; + } else if (text.size() > maxSize) { + TStringStream out; + out << text.substr(0, maxSize) + "... (" << text.size() << " bytes total)"; + return out.Str(); + } else { + return text; + } +} + +TString GetLoggedAttributes(const THttpHeader& header, const TString& url, bool includeParameters, size_t sizeLimit) +{ + const auto parametersDebugString = GetParametersDebugString(header); + TStringStream out; + out << "Method: " << url << "; " + << "X-YT-Parameters (sent in " << (includeParameters ? "header" : "body") << "): " << TruncateForLogs(parametersDebugString, sizeLimit); + return out.Str(); +} + +void LogRequest(const THttpHeader& header, const TString& url, bool includeParameters, const TString& requestId, const TString& hostName) +{ + YT_LOG_DEBUG("REQ %v - sending request (HostName: %v; %v)", + requestId, + hostName, + GetLoggedAttributes(header, url, includeParameters, Max<size_t>())); +} + +/////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/http/helpers.h b/yt/cpp/mapreduce/http/helpers.h new file mode 100644 index 00000000000..0c510fa2e86 --- /dev/null +++ b/yt/cpp/mapreduce/http/helpers.h @@ -0,0 +1,25 @@ +#pragma once + +#include "fwd.h" + +#include "http.h" + +#include <util/generic/fwd.h> + +namespace NYT { + +/////////////////////////////////////////////////////////////////////////////// + +TString CreateHostNameWithPort(const TString& name, const TClientContext& context); + +TString GetFullUrl(const TString& hostName, const TClientContext& context, THttpHeader& header); + +TString TruncateForLogs(const TString& text, size_t maxSize); + +TString GetLoggedAttributes(const THttpHeader& header, const TString& url, bool includeParameters, size_t sizeLimit); + +void LogRequest(const THttpHeader& header, const TString& url, bool includeParameters, const TString& requestId, const TString& hostName); + +/////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/http/host_manager.cpp b/yt/cpp/mapreduce/http/host_manager.cpp new file mode 100644 index 00000000000..a239dde769a --- /dev/null +++ b/yt/cpp/mapreduce/http/host_manager.cpp @@ -0,0 +1,140 @@ +#include "host_manager.h" + +#include "context.h" +#include "helpers.h" +#include "http.h" +#include "http_client.h" +#include "requests.h" + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <yt/cpp/mapreduce/interface/config.h> + +#include <library/cpp/json/json_reader.h> + +#include <util/generic/guid.h> +#include <util/generic/vector.h> +#include <util/generic/singleton.h> +#include <util/generic/ymath.h> + +#include <util/random/random.h> + +#include <util/string/vector.h> + +namespace NYT::NPrivate { + +//////////////////////////////////////////////////////////////////////////////// + +static TVector<TString> ParseJsonStringArray(const TString& response) +{ + NJson::TJsonValue value; + TStringInput input(response); + NJson::ReadJsonTree(&input, &value); + + const NJson::TJsonValue::TArray& array = value.GetArray(); + TVector<TString> result; + result.reserve(array.size()); + for (size_t i = 0; i < array.size(); ++i) { + result.push_back(array[i].GetString()); + } + return result; +} + +//////////////////////////////////////////////////////////////////////////////// + +class THostManager::TClusterHostList +{ +public: + explicit TClusterHostList(TVector<TString> hosts) + : Hosts_(std::move(hosts)) + , Timestamp_(TInstant::Now()) + { } + + explicit TClusterHostList(std::exception_ptr error) + : Error_(std::move(error)) + , Timestamp_(TInstant::Now()) + { } + + TString ChooseHostOrThrow() const + { + if (Error_) { + std::rethrow_exception(Error_); + } + + if (Hosts_.empty()) { + ythrow yexception() << "fetched list of proxies is empty"; + } + + return Hosts_[RandomNumber<size_t>(Hosts_.size())]; + } + + TDuration GetAge() const + { + return TInstant::Now() - Timestamp_; + } + +private: + TVector<TString> Hosts_; + std::exception_ptr Error_; + TInstant Timestamp_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +THostManager& THostManager::Get() +{ + return *Singleton<THostManager>(); +} + +void THostManager::Reset() +{ + auto guard = Guard(Lock_); + ClusterHosts_.clear(); +} + +TString THostManager::GetProxyForHeavyRequest(const TClientContext& context) +{ + auto cluster = context.ServerName; + { + auto guard = Guard(Lock_); + auto it = ClusterHosts_.find(cluster); + if (it != ClusterHosts_.end() && it->second.GetAge() < context.Config->HostListUpdateInterval) { + return it->second.ChooseHostOrThrow(); + } + } + + auto hostList = GetHosts(context); + auto result = hostList.ChooseHostOrThrow(); + { + auto guard = Guard(Lock_); + ClusterHosts_.emplace(cluster, std::move(hostList)); + } + return result; +} + +THostManager::TClusterHostList THostManager::GetHosts(const TClientContext& context) +{ + TString hostsEndpoint = context.Config->Hosts; + while (hostsEndpoint.StartsWith("/")) { + hostsEndpoint = hostsEndpoint.substr(1); + } + THttpHeader header("GET", hostsEndpoint, false); + + try { + auto hostName = context.ServerName; + auto requestId = CreateGuidAsString(); + // TODO: we need to set socket timeout here + auto response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header); + auto hosts = ParseJsonStringArray(response->GetResponse()); + for (auto& host : hosts) { + host = CreateHostNameWithPort(host, context); + } + return TClusterHostList(std::move(hosts)); + } catch (const std::exception& e) { + return TClusterHostList(std::current_exception()); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NPrivate diff --git a/yt/cpp/mapreduce/http/host_manager.h b/yt/cpp/mapreduce/http/host_manager.h new file mode 100644 index 00000000000..fdbb740566a --- /dev/null +++ b/yt/cpp/mapreduce/http/host_manager.h @@ -0,0 +1,37 @@ +#pragma once + +#include "fwd.h" + +#include <util/generic/string.h> +#include <util/generic/hash.h> +#include <util/system/spinlock.h> + + +namespace NYT::NPrivate { + +//////////////////////////////////////////////////////////////////////////////// + +class THostManager +{ +public: + static THostManager& Get(); + + TString GetProxyForHeavyRequest(const TClientContext& context); + + // For testing purposes only. + void Reset(); + +private: + class TClusterHostList; + +private: + TAdaptiveLock Lock_; + THashMap<TString, TClusterHostList> ClusterHosts_; + +private: + static TClusterHostList GetHosts(const TClientContext& context); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NPrivate diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp new file mode 100644 index 00000000000..d44b2638a00 --- /dev/null +++ b/yt/cpp/mapreduce/http/http.cpp @@ -0,0 +1,1014 @@ +#include "http.h" + +#include "abortable_http_response.h" +#include "core.h" +#include "helpers.h" + +#include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/cpp/mapreduce/common/retry_lib.h> +#include <yt/cpp/mapreduce/common/wait_proxy.h> + +#include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/errors.h> +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <yt/yt/core/http/http.h> + +#include <library/cpp/json/json_writer.h> + +#include <library/cpp/string_utils/base64/base64.h> +#include <library/cpp/string_utils/quote/quote.h> + +#include <util/generic/singleton.h> +#include <util/generic/algorithm.h> + +#include <util/stream/mem.h> + +#include <util/string/builder.h> +#include <util/string/cast.h> +#include <util/string/escape.h> +#include <util/string/printf.h> + +#include <util/system/byteorder.h> +#include <util/system/getpid.h> + +#include <exception> + + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class THttpRequest::TRequestStream + : public IOutputStream +{ +public: + TRequestStream(THttpRequest* httpRequest, const TSocket& s) + : HttpRequest_(httpRequest) + , SocketOutput_(s) + , HttpOutput_(static_cast<IOutputStream*>(&SocketOutput_)) + { + HttpOutput_.EnableKeepAlive(true); + } + +private: + void DoWrite(const void* buf, size_t len) override + { + WrapWriteFunc([&] { + HttpOutput_.Write(buf, len); + }); + } + + void DoWriteV(const TPart* parts, size_t count) override + { + WrapWriteFunc([&] { + HttpOutput_.Write(parts, count); + }); + } + + void DoWriteC(char ch) override + { + WrapWriteFunc([&] { + HttpOutput_.Write(ch); + }); + } + + void DoFlush() override + { + WrapWriteFunc([&] { + HttpOutput_.Flush(); + }); + } + + void DoFinish() override + { + WrapWriteFunc([&] { + HttpOutput_.Finish(); + }); + } + + void WrapWriteFunc(std::function<void()> func) + { + CheckErrorState(); + try { + func(); + } catch (const std::exception&) { + HandleWriteException(); + } + } + + // In many cases http proxy stops reading request and resets connection + // if error has happend. This function tries to read error response + // in such cases. + void HandleWriteException() { + Y_VERIFY(WriteError_ == nullptr); + WriteError_ = std::current_exception(); + Y_VERIFY(WriteError_ != nullptr); + try { + HttpRequest_->GetResponseStream(); + } catch (const TErrorResponse &) { + throw; + } catch (...) { + } + std::rethrow_exception(WriteError_); + } + + void CheckErrorState() + { + if (WriteError_) { + std::rethrow_exception(WriteError_); + } + } + +private: + THttpRequest* const HttpRequest_; + TSocketOutput SocketOutput_; + THttpOutput HttpOutput_; + std::exception_ptr WriteError_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +THttpHeader::THttpHeader(const TString& method, const TString& command, bool isApi) + : Method(method) + , Command(command) + , IsApi(isApi) +{ } + +void THttpHeader::AddParameter(const TString& key, TNode value, bool overwrite) +{ + auto it = Parameters.find(key); + if (it == Parameters.end()) { + Parameters.emplace(key, std::move(value)); + } else { + if (overwrite) { + it->second = std::move(value); + } else { + ythrow yexception() << "Duplicate key: " << key; + } + } +} + +void THttpHeader::MergeParameters(const TNode& newParameters, bool overwrite) +{ + for (const auto& p : newParameters.AsMap()) { + AddParameter(p.first, p.second, overwrite); + } +} + +void THttpHeader::RemoveParameter(const TString& key) +{ + Parameters.erase(key); +} + +TNode THttpHeader::GetParameters() const +{ + return Parameters; +} + +void THttpHeader::AddTransactionId(const TTransactionId& transactionId, bool overwrite) +{ + if (transactionId) { + AddParameter("transaction_id", GetGuidAsString(transactionId), overwrite); + } else { + RemoveParameter("transaction_id"); + } +} + +void THttpHeader::AddPath(const TString& path, bool overwrite) +{ + AddParameter("path", path, overwrite); +} + +void THttpHeader::AddOperationId(const TOperationId& operationId, bool overwrite) +{ + AddParameter("operation_id", GetGuidAsString(operationId), overwrite); +} + +void THttpHeader::AddMutationId() +{ + TGUID guid; + + // Some users use `fork()' with yt wrapper + // (actually they use python + multiprocessing) + // and CreateGuid is not resistant to `fork()', so spice it a little bit. + // + // Check IGNIETFERRO-610 + CreateGuid(&guid); + guid.dw[2] = GetPID() ^ MicroSeconds(); + + AddParameter("mutation_id", GetGuidAsString(guid), true); +} + +bool THttpHeader::HasMutationId() const +{ + return Parameters.contains("mutation_id"); +} + +void THttpHeader::SetToken(const TString& token) +{ + Token = token; +} + +void THttpHeader::SetImpersonationUser(const TString& impersonationUser) +{ + ImpersonationUser = impersonationUser; +} + +void THttpHeader::SetServiceTicket(const TString& ticket) +{ + ServiceTicket = ticket; +} + +void THttpHeader::SetInputFormat(const TMaybe<TFormat>& format) +{ + InputFormat = format; +} + +void THttpHeader::SetOutputFormat(const TMaybe<TFormat>& format) +{ + OutputFormat = format; +} + +TMaybe<TFormat> THttpHeader::GetOutputFormat() const +{ + return OutputFormat; +} + +void THttpHeader::SetRequestCompression(const TString& compression) +{ + RequestCompression = compression; +} + +void THttpHeader::SetResponseCompression(const TString& compression) +{ + ResponseCompression = compression; +} + +TString THttpHeader::GetCommand() const +{ + return Command; +} + +TString THttpHeader::GetUrl() const +{ + TStringStream url; + + if (IsApi) { + url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command; + } else { + url << "/" << Command; + } + + return url.Str(); +} + +bool THttpHeader::ShouldAcceptFraming() const +{ + return TConfig::Get()->CommandsWithFraming.contains(Command); +} + +TString THttpHeader::GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters) const +{ + TStringStream result; + + result << Method << " " << GetUrl() << " HTTP/1.1\r\n"; + + GetHeader(hostName, requestId, includeParameters).Get()->WriteTo(&result); + + if (ShouldAcceptFraming()) { + result << "X-YT-Accept-Framing: 1\r\n"; + } + + result << "\r\n"; + + return result.Str(); +} + +NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const TString& requestId, bool includeParameters) const +{ + auto headers = New<NHttp::THeaders>(); + + headers->Add("Host", hostName); + headers->Add("User-Agent", TProcessState::Get()->ClientVersion); + + if (!Token.empty()) { + headers->Add("Authorization", "OAuth " + Token); + } + if (!ServiceTicket.empty()) { + headers->Add("X-Ya-Service-Ticket", ServiceTicket); + } + if (!ImpersonationUser.empty()) { + headers->Add("X-Yt-User-Name", ImpersonationUser); + } + + if (Method == "PUT" || Method == "POST") { + headers->Add("Transfer-Encoding", "chunked"); + } + + headers->Add("X-YT-Correlation-Id", requestId); + headers->Add("X-YT-Header-Format", "<format=text>yson"); + + headers->Add("Content-Encoding", RequestCompression); + headers->Add("Accept-Encoding", ResponseCompression); + + auto printYTHeader = [&headers] (const char* headerName, const TString& value) { + static const size_t maxHttpHeaderSize = 64 << 10; + if (!value) { + return; + } + if (value.size() <= maxHttpHeaderSize) { + headers->Add(headerName, value); + return; + } + + TString encoded; + Base64Encode(value, encoded); + auto ptr = encoded.data(); + auto finish = encoded.data() + encoded.size(); + size_t index = 0; + do { + auto end = Min(ptr + maxHttpHeaderSize, finish); + headers->Add(Format("%v%v", headerName, index++), TString(ptr, end)); + ptr = end; + } while (ptr != finish); + }; + + if (InputFormat) { + printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat->Config)); + } + if (OutputFormat) { + printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat->Config)); + } + if (includeParameters) { + printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters)); + } + + return NHttp::THeadersPtrWrapper(std::move(headers)); +} + +const TString& THttpHeader::GetMethod() const +{ + return Method; +} + +//////////////////////////////////////////////////////////////////////////////// + +TAddressCache* TAddressCache::Get() +{ + return Singleton<TAddressCache>(); +} + +bool ContainsAddressOfRequiredVersion(const TAddressCache::TAddressPtr& address) +{ + if (!TConfig::Get()->ForceIpV4 && !TConfig::Get()->ForceIpV6) { + return true; + } + + for (auto i = address->Begin(); i != address->End(); ++i) { + const auto& addressInfo = *i; + if (TConfig::Get()->ForceIpV4 && addressInfo.ai_family == AF_INET) { + return true; + } + if (TConfig::Get()->ForceIpV6 && addressInfo.ai_family == AF_INET6) { + return true; + } + } + return false; +} + +TAddressCache::TAddressPtr TAddressCache::Resolve(const TString& hostName) +{ + auto address = FindAddress(hostName); + if (address) { + return address; + } + + TString host(hostName); + ui16 port = 80; + + auto colon = hostName.find(':'); + if (colon != TString::npos) { + port = FromString<ui16>(hostName.substr(colon + 1)); + host = hostName.substr(0, colon); + } + + auto retryPolicy = CreateDefaultRequestRetryPolicy(TConfig::Get()); + auto error = yexception() << "can not resolve address of required version for host " << hostName; + while (true) { + address = new TNetworkAddress(host, port); + if (ContainsAddressOfRequiredVersion(address)) { + break; + } + retryPolicy->NotifyNewAttempt(); + YT_LOG_DEBUG("Failed to resolve address of required version for host %v, retrying: %v", + hostName, + retryPolicy->GetAttemptDescription()); + if (auto backoffDuration = retryPolicy->OnGenericError(error)) { + NDetail::TWaitProxy::Get()->Sleep(*backoffDuration); + } else { + ythrow error; + } + } + + AddAddress(hostName, address); + return address; +} + +TAddressCache::TAddressPtr TAddressCache::FindAddress(const TString& hostName) const +{ + TCacheEntry entry; + { + TReadGuard guard(Lock_); + auto it = Cache_.find(hostName); + if (it == Cache_.end()) { + return nullptr; + } + entry = it->second; + } + + if (TInstant::Now() > entry.ExpirationTime) { + YT_LOG_DEBUG("Address resolution cache entry for host %v is expired, will retry resolution", + hostName); + return nullptr; + } + + if (!ContainsAddressOfRequiredVersion(entry.Address)) { + YT_LOG_DEBUG("Address of required version not found for host %v, will retry resolution", + hostName); + return nullptr; + } + + return entry.Address; +} + +void TAddressCache::AddAddress(TString hostName, TAddressPtr address) +{ + auto entry = TCacheEntry{ + .Address = std::move(address), + .ExpirationTime = TInstant::Now() + TConfig::Get()->AddressCacheExpirationTimeout, + }; + + { + TWriteGuard guard(Lock_); + Cache_.emplace(std::move(hostName), std::move(entry)); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +TConnectionPool* TConnectionPool::Get() +{ + return Singleton<TConnectionPool>(); +} + +TConnectionPtr TConnectionPool::Connect( + const TString& hostName, + TDuration socketTimeout) +{ + Refresh(); + + if (socketTimeout == TDuration::Zero()) { + socketTimeout = TConfig::Get()->SocketTimeout; + } + + { + auto guard = Guard(Lock_); + auto now = TInstant::Now(); + auto range = Connections_.equal_range(hostName); + for (auto it = range.first; it != range.second; ++it) { + auto& connection = it->second; + if (connection->DeadLine < now) { + continue; + } + if (!AtomicCas(&connection->Busy, 1, 0)) { + continue; + } + + connection->DeadLine = now + socketTimeout; + connection->Socket->SetSocketTimeout(socketTimeout.Seconds()); + return connection; + } + } + + TConnectionPtr connection(new TConnection); + + auto networkAddress = TAddressCache::Get()->Resolve(hostName); + TSocketHolder socket(DoConnect(networkAddress)); + SetNonBlock(socket, false); + + connection->Socket.Reset(new TSocket(socket.Release())); + + connection->DeadLine = TInstant::Now() + socketTimeout; + connection->Socket->SetSocketTimeout(socketTimeout.Seconds()); + + { + auto guard = Guard(Lock_); + static ui32 connectionId = 0; + connection->Id = ++connectionId; + Connections_.insert({hostName, connection}); + } + + YT_LOG_DEBUG("New connection to %v #%v opened", + hostName, + connection->Id); + + return connection; +} + +void TConnectionPool::Release(TConnectionPtr connection) +{ + auto socketTimeout = TConfig::Get()->SocketTimeout; + auto newDeadline = TInstant::Now() + socketTimeout; + + { + auto guard = Guard(Lock_); + connection->DeadLine = newDeadline; + } + + connection->Socket->SetSocketTimeout(socketTimeout.Seconds()); + AtomicSet(connection->Busy, 0); + + Refresh(); +} + +void TConnectionPool::Invalidate( + const TString& hostName, + TConnectionPtr connection) +{ + auto guard = Guard(Lock_); + auto range = Connections_.equal_range(hostName); + for (auto it = range.first; it != range.second; ++it) { + if (it->second == connection) { + YT_LOG_DEBUG("Closing connection #%v", + connection->Id); + Connections_.erase(it); + return; + } + } +} + +void TConnectionPool::Refresh() +{ + auto guard = Guard(Lock_); + + // simple, since we don't expect too many connections + using TItem = std::pair<TInstant, TConnectionMap::iterator>; + std::vector<TItem> sortedConnections; + for (auto it = Connections_.begin(); it != Connections_.end(); ++it) { + sortedConnections.emplace_back(it->second->DeadLine, it); + } + + std::sort( + sortedConnections.begin(), + sortedConnections.end(), + [] (const TItem& a, const TItem& b) -> bool { + return a.first < b.first; + }); + + auto removeCount = static_cast<int>(Connections_.size()) - TConfig::Get()->ConnectionPoolSize; + + const auto now = TInstant::Now(); + for (const auto& item : sortedConnections) { + const auto& mapIterator = item.second; + auto connection = mapIterator->second; + if (AtomicGet(connection->Busy)) { + continue; + } + + if (removeCount > 0) { + Connections_.erase(mapIterator); + YT_LOG_DEBUG("Closing connection #%v (too many opened connections)", + connection->Id); + --removeCount; + continue; + } + + if (connection->DeadLine < now) { + Connections_.erase(mapIterator); + YT_LOG_DEBUG("Closing connection #%v (timeout)", + connection->Id); + } + } +} + +SOCKET TConnectionPool::DoConnect(TAddressCache::TAddressPtr address) +{ + int lastError = 0; + + for (auto i = address->Begin(); i != address->End(); ++i) { + struct addrinfo* info = &*i; + + if (TConfig::Get()->ForceIpV4 && info->ai_family != AF_INET) { + continue; + } + + if (TConfig::Get()->ForceIpV6 && info->ai_family != AF_INET6) { + continue; + } + + TSocketHolder socket( + ::socket(info->ai_family, info->ai_socktype, info->ai_protocol)); + + if (socket.Closed()) { + lastError = LastSystemError(); + continue; + } + + SetNonBlock(socket, true); + if (TConfig::Get()->SocketPriority) { + SetSocketPriority(socket, *TConfig::Get()->SocketPriority); + } + + if (connect(socket, info->ai_addr, info->ai_addrlen) == 0) + return socket.Release(); + + int err = LastSystemError(); + if (err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK) { + struct pollfd p = { + socket, + POLLOUT, + 0 + }; + const ssize_t n = PollD(&p, 1, TInstant::Now() + TConfig::Get()->ConnectTimeout); + if (n < 0) { + ythrow TSystemError(-(int)n) << "can not connect to " << info; + } + CheckedGetSockOpt(socket, SOL_SOCKET, SO_ERROR, err, "socket error"); + if (!err) + return socket.Release(); + } + + lastError = err; + continue; + } + + ythrow TSystemError(lastError) << "can not connect to " << *address; +} + +//////////////////////////////////////////////////////////////////////////////// + +static TMaybe<TString> GetProxyName(const THttpInput& input) +{ + if (auto proxyHeader = input.Headers().FindHeader("X-YT-Proxy")) { + return proxyHeader->Value(); + } + return Nothing(); +} + +THttpResponse::THttpResponse( + IInputStream* socketStream, + const TString& requestId, + const TString& hostName) + : HttpInput_(socketStream) + , RequestId_(requestId) + , HostName_(GetProxyName(HttpInput_).GetOrElse(hostName)) + , Unframe_(HttpInput_.Headers().HasHeader("X-YT-Framing")) +{ + HttpCode_ = ParseHttpRetCode(HttpInput_.FirstLine()); + if (HttpCode_ == 200 || HttpCode_ == 202) { + return; + } + + ErrorResponse_ = TErrorResponse(HttpCode_, RequestId_); + + auto logAndSetError = [&] (const TString& rawError) { + YT_LOG_ERROR("RSP %v - HTTP %v - %v", + RequestId_, + HttpCode_, + rawError.data()); + ErrorResponse_->SetRawError(rawError); + }; + + switch (HttpCode_) { + case 429: + logAndSetError("request rate limit exceeded"); + break; + + case 500: + logAndSetError(::TStringBuilder() << "internal error in proxy " << HostName_); + break; + + default: { + TStringStream httpHeaders; + httpHeaders << "HTTP headers ("; + for (const auto& header : HttpInput_.Headers()) { + httpHeaders << header.Name() << ": " << header.Value() << "; "; + } + httpHeaders << ")"; + + auto errorString = Sprintf("RSP %s - HTTP %d - %s", + RequestId_.data(), + HttpCode_, + httpHeaders.Str().data()); + + YT_LOG_ERROR("%v", + errorString.data()); + + if (auto parsedResponse = ParseError(HttpInput_.Headers())) { + ErrorResponse_ = parsedResponse.GetRef(); + } else { + ErrorResponse_->SetRawError( + errorString + " - X-YT-Error is missing in headers"); + } + break; + } + } +} + +const THttpHeaders& THttpResponse::Headers() const +{ + return HttpInput_.Headers(); +} + +void THttpResponse::CheckErrorResponse() const +{ + if (ErrorResponse_) { + throw *ErrorResponse_; + } +} + +bool THttpResponse::IsExhausted() const +{ + return IsExhausted_; +} + +int THttpResponse::GetHttpCode() const +{ + return HttpCode_; +} + +const TString& THttpResponse::GetHostName() const +{ + return HostName_; +} + +bool THttpResponse::IsKeepAlive() const +{ + return HttpInput_.IsKeepAlive(); +} + +TMaybe<TErrorResponse> THttpResponse::ParseError(const THttpHeaders& headers) +{ + for (const auto& header : headers) { + if (header.Name() == "X-YT-Error") { + TErrorResponse errorResponse(HttpCode_, RequestId_); + errorResponse.ParseFromJsonError(header.Value()); + if (errorResponse.IsOk()) { + return Nothing(); + } + return errorResponse; + } + } + return Nothing(); +} + +size_t THttpResponse::DoRead(void* buf, size_t len) +{ + size_t read; + if (Unframe_) { + read = UnframeRead(buf, len); + } else { + read = HttpInput_.Read(buf, len); + } + if (read == 0 && len != 0) { + // THttpInput MUST return defined (but may be empty) + // trailers when it is exhausted. + Y_VERIFY(HttpInput_.Trailers().Defined(), + "trailers MUST be defined for exhausted stream"); + CheckTrailers(HttpInput_.Trailers().GetRef()); + IsExhausted_ = true; + } + return read; +} + +size_t THttpResponse::DoSkip(size_t len) +{ + size_t skipped; + if (Unframe_) { + skipped = UnframeSkip(len); + } else { + skipped = HttpInput_.Skip(len); + } + if (skipped == 0 && len != 0) { + // THttpInput MUST return defined (but may be empty) + // trailers when it is exhausted. + Y_VERIFY(HttpInput_.Trailers().Defined(), + "trailers MUST be defined for exhausted stream"); + CheckTrailers(HttpInput_.Trailers().GetRef()); + IsExhausted_ = true; + } + return skipped; +} + +void THttpResponse::CheckTrailers(const THttpHeaders& trailers) +{ + if (auto errorResponse = ParseError(trailers)) { + errorResponse->SetIsFromTrailers(true); + YT_LOG_ERROR("RSP %v - %v", + RequestId_, + errorResponse.GetRef().what()); + ythrow errorResponse.GetRef(); + } +} + +static ui32 ReadDataFrameSize(THttpInput* stream) +{ + ui32 littleEndianSize; + auto read = stream->Load(&littleEndianSize, sizeof(littleEndianSize)); + if (read < sizeof(littleEndianSize)) { + ythrow yexception() << "Bad data frame header: " << + "expected " << sizeof(littleEndianSize) << " bytes, got " << read; + } + return LittleToHost(littleEndianSize); +} + +bool THttpResponse::RefreshFrameIfNecessary() +{ + while (RemainingFrameSize_ == 0) { + ui8 frameTypeByte; + auto read = HttpInput_.Read(&frameTypeByte, sizeof(frameTypeByte)); + if (read == 0) { + return false; + } + auto frameType = static_cast<EFrameType>(frameTypeByte); + switch (frameType) { + case EFrameType::KeepAlive: + break; + case EFrameType::Data: + RemainingFrameSize_ = ReadDataFrameSize(&HttpInput_); + break; + default: + ythrow yexception() << "Bad frame type " << static_cast<int>(frameTypeByte); + } + } + return true; +} + +size_t THttpResponse::UnframeRead(void* buf, size_t len) +{ + if (!RefreshFrameIfNecessary()) { + return 0; + } + auto read = HttpInput_.Read(buf, Min(len, RemainingFrameSize_)); + RemainingFrameSize_ -= read; + return read; +} + +size_t THttpResponse::UnframeSkip(size_t len) +{ + if (!RefreshFrameIfNecessary()) { + return 0; + } + auto skipped = HttpInput_.Skip(Min(len, RemainingFrameSize_)); + RemainingFrameSize_ -= skipped; + return skipped; +} + +//////////////////////////////////////////////////////////////////////////////// + +THttpRequest::THttpRequest() +{ + RequestId = CreateGuidAsString(); +} + +THttpRequest::THttpRequest(const TString& requestId) + : RequestId(requestId) +{ } + +THttpRequest::~THttpRequest() +{ + if (!Connection) { + return; + } + + if (Input && Input->IsKeepAlive() && Input->IsExhausted()) { + // We should return to the pool only connections where HTTP response was fully read. + // Otherwise next reader might read our remaining data and misinterpret them (YT-6510). + TConnectionPool::Get()->Release(Connection); + } else { + TConnectionPool::Get()->Invalidate(HostName, Connection); + } +} + +TString THttpRequest::GetRequestId() const +{ + return RequestId; +} + +void THttpRequest::Connect(TString hostName, TDuration socketTimeout) +{ + HostName = std::move(hostName); + YT_LOG_DEBUG("REQ %v - requesting connection to %v from connection pool", + RequestId, + HostName); + + StartTime_ = TInstant::Now(); + Connection = TConnectionPool::Get()->Connect(HostName, socketTimeout); + + YT_LOG_DEBUG("REQ %v - connection #%v", + RequestId, + Connection->Id); +} + +IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool includeParameters) +{ + auto strHeader = header.GetHeaderAsString(HostName, RequestId, includeParameters); + Url_ = header.GetUrl(); + + LogRequest(header, Url_, includeParameters, RequestId, HostName); + + LoggedAttributes_ = GetLoggedAttributes(header, Url_, includeParameters, 128); + + auto outputFormat = header.GetOutputFormat(); + if (outputFormat && outputFormat->IsTextYson()) { + LogResponse = true; + } + + RequestStream_ = MakeHolder<TRequestStream>(this, *Connection->Socket.Get()); + + RequestStream_->Write(strHeader.data(), strHeader.size()); + return RequestStream_.Get(); +} + +IOutputStream* THttpRequest::StartRequest(const THttpHeader& header) +{ + return StartRequestImpl(header, true); +} + +void THttpRequest::FinishRequest() +{ + RequestStream_->Flush(); + RequestStream_->Finish(); +} + +void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body) +{ + if (!body && (header.GetMethod() == "PUT" || header.GetMethod() == "POST")) { + const auto& parameters = header.GetParameters(); + auto parametersStr = NodeToYsonString(parameters); + auto* output = StartRequestImpl(header, false); + output->Write(parametersStr); + FinishRequest(); + } else { + auto* output = StartRequest(header); + if (body) { + output->Write(*body); + } + FinishRequest(); + } +} + +THttpResponse* THttpRequest::GetResponseStream() +{ + if (!Input) { + SocketInput.Reset(new TSocketInput(*Connection->Socket.Get())); + if (TConfig::Get()->UseAbortableResponse) { + Y_VERIFY(!Url_.empty()); + Input.Reset(new TAbortableHttpResponse(SocketInput.Get(), RequestId, HostName, Url_)); + } else { + Input.Reset(new THttpResponse(SocketInput.Get(), RequestId, HostName)); + } + Input->CheckErrorResponse(); + } + return Input.Get(); +} + +TString THttpRequest::GetResponse() +{ + TString result = GetResponseStream()->ReadAll(); + + TStringStream loggedAttributes; + loggedAttributes + << "Time: " << TInstant::Now() - StartTime_ << "; " + << "HostName: " << GetResponseStream()->GetHostName() << "; " + << LoggedAttributes_; + + if (LogResponse) { + constexpr auto sizeLimit = 1 << 7; + YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)", + RequestId, + TruncateForLogs(result, sizeLimit), + loggedAttributes.Str()); + } else { + YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)", + RequestId, + result.size(), + loggedAttributes.Str()); + } + return result; +} + +int THttpRequest::GetHttpCode() { + return GetResponseStream()->GetHttpCode(); +} + +void THttpRequest::InvalidateConnection() +{ + TConnectionPool::Get()->Invalidate(HostName, Connection); + Connection.Reset(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h new file mode 100644 index 00000000000..ee8783088db --- /dev/null +++ b/yt/cpp/mapreduce/http/http.h @@ -0,0 +1,256 @@ +#pragma once + +#include "fwd.h" + +#include <yt/cpp/mapreduce/interface/common.h> +#include <yt/cpp/mapreduce/interface/errors.h> +#include <yt/cpp/mapreduce/interface/format.h> +#include <yt/cpp/mapreduce/interface/io.h> +#include <yt/cpp/mapreduce/interface/node.h> + +#include <library/cpp/deprecated/atomic/atomic.h> +#include <library/cpp/http/io/stream.h> + +#include <util/generic/hash.h> +#include <util/generic/hash_multi_map.h> +#include <util/generic/strbuf.h> +#include <util/generic/guid.h> +#include <util/network/socket.h> +#include <util/stream/input.h> +#include <util/system/mutex.h> +#include <util/system/rwlock.h> +#include <util/generic/ptr.h> + +namespace NYT { + +class TNode; + +namespace NHttp { + +struct THeadersPtrWrapper; + +} // NHttp + +/////////////////////////////////////////////////////////////////////////////// + +enum class EFrameType +{ + Data = 0x01, + KeepAlive = 0x02, +}; + + +class THttpHeader +{ +public: + THttpHeader(const TString& method, const TString& command, bool isApi = true); + + void AddParameter(const TString& key, TNode value, bool overwrite = false); + void RemoveParameter(const TString& key); + void MergeParameters(const TNode& parameters, bool overwrite = false); + TNode GetParameters() const; + + void AddTransactionId(const TTransactionId& transactionId, bool overwrite = false); + void AddPath(const TString& path, bool overwrite = false); + void AddOperationId(const TOperationId& operationId, bool overwrite = false); + void AddMutationId(); + bool HasMutationId() const; + + void SetToken(const TString& token); + void SetImpersonationUser(const TString& impersonationUser); + + void SetServiceTicket(const TString& ticket); + + void SetInputFormat(const TMaybe<TFormat>& format); + + void SetOutputFormat(const TMaybe<TFormat>& format); + TMaybe<TFormat> GetOutputFormat() const; + + void SetRequestCompression(const TString& compression); + void SetResponseCompression(const TString& compression); + + TString GetCommand() const; + TString GetUrl() const; + TString GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters = true) const; + NHttp::THeadersPtrWrapper GetHeader(const TString& hostName, const TString& requestId, bool includeParameters) const; + + const TString& GetMethod() const; + +private: + bool ShouldAcceptFraming() const; + +private: + const TString Method; + const TString Command; + const bool IsApi; + + TNode::TMapType Parameters; + TString ImpersonationUser; + TString Token; + TString ServiceTicket; + TNode Attributes; + +private: + TMaybe<TFormat> InputFormat = TFormat::YsonText(); + TMaybe<TFormat> OutputFormat = TFormat::YsonText(); + + TString RequestCompression = "identity"; + TString ResponseCompression = "identity"; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TAddressCache +{ +public: + using TAddressPtr = TAtomicSharedPtr<TNetworkAddress>; + + static TAddressCache* Get(); + + TAddressPtr Resolve(const TString& hostName); + +private: + struct TCacheEntry { + TAddressPtr Address; + TInstant ExpirationTime; + }; + +private: + TAddressPtr FindAddress(const TString& hostName) const; + void AddAddress(TString hostName, TAddressPtr address); + +private: + TRWMutex Lock_; + THashMap<TString, TCacheEntry> Cache_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TConnection +{ + THolder<TSocket> Socket; + TAtomic Busy = 1; + TInstant DeadLine; + ui32 Id; +}; + +using TConnectionPtr = TAtomicSharedPtr<TConnection>; + +class TConnectionPool +{ +public: + using TConnectionMap = THashMultiMap<TString, TConnectionPtr>; + + static TConnectionPool* Get(); + + TConnectionPtr Connect(const TString& hostName, TDuration socketTimeout); + void Release(TConnectionPtr connection); + void Invalidate(const TString& hostName, TConnectionPtr connection); + +private: + void Refresh(); + static SOCKET DoConnect(TAddressCache::TAddressPtr address); + +private: + TMutex Lock_; + TConnectionMap Connections_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +// +// Input stream that handles YT-specific header/trailer errors +// and throws TErrorResponse if it finds any. +class THttpResponse + : public IInputStream +{ +public: + // 'requestId' and 'hostName' are provided for debug reasons + // (they will appear in some error messages). + THttpResponse( + IInputStream* socketStream, + const TString& requestId, + const TString& hostName); + + const THttpHeaders& Headers() const; + + void CheckErrorResponse() const; + bool IsExhausted() const; + int GetHttpCode() const; + const TString& GetHostName() const; + bool IsKeepAlive() const; + +protected: + size_t DoRead(void* buf, size_t len) override; + size_t DoSkip(size_t len) override; + +private: + void CheckTrailers(const THttpHeaders& trailers); + TMaybe<TErrorResponse> ParseError(const THttpHeaders& headers); + size_t UnframeRead(void* buf, size_t len); + size_t UnframeSkip(size_t len); + bool RefreshFrameIfNecessary(); + +private: + THttpInput HttpInput_; + const TString RequestId_; + const TString HostName_; + int HttpCode_ = 0; + TMaybe<TErrorResponse> ErrorResponse_; + bool IsExhausted_ = false; + const bool Unframe_; + size_t RemainingFrameSize_ = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class THttpRequest +{ +public: + THttpRequest(); + THttpRequest(const TString& requestId); + ~THttpRequest(); + + TString GetRequestId() const; + + void Connect(TString hostName, TDuration socketTimeout = TDuration::Zero()); + + IOutputStream* StartRequest(const THttpHeader& header); + void FinishRequest(); + + void SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body); + + THttpResponse* GetResponseStream(); + + TString GetResponse(); + + void InvalidateConnection(); + + int GetHttpCode(); + +private: + IOutputStream* StartRequestImpl(const THttpHeader& header, bool includeParameters); + +private: + class TRequestStream; + +private: + TString HostName; + TString RequestId; + TString Url_; + TInstant StartTime_; + TString LoggedAttributes_; + + TConnectionPtr Connection; + + THolder<TRequestStream> RequestStream_; + + THolder<TSocketInput> SocketInput; + THolder<THttpResponse> Input; + + bool LogResponse = false; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/http/http_client.cpp b/yt/cpp/mapreduce/http/http_client.cpp new file mode 100644 index 00000000000..a2af1182dcf --- /dev/null +++ b/yt/cpp/mapreduce/http/http_client.cpp @@ -0,0 +1,603 @@ +#include "http_client.h" + +#include "abortable_http_response.h" +#include "core.h" +#include "helpers.h" +#include "http.h" + +#include <yt/cpp/mapreduce/interface/config.h> + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <yt/yt/core/concurrency/thread_pool_poller.h> + +#include <yt/yt/core/http/client.h> +#include <yt/yt/core/http/config.h> +#include <yt/yt/core/http/http.h> + +#include <yt/yt/core/https/client.h> +#include <yt/yt/core/https/config.h> + +#include <library/cpp/yson/node/node_io.h> + +namespace NYT::NHttpClient { + +namespace { + +TString CreateHost(TStringBuf host, TStringBuf port) +{ + if (!port.empty()) { + return Format("%v:%v", host, port); + } + + return TString(host); +} + +TMaybe<TErrorResponse> GetErrorResponse(const TString& hostName, const TString& requestId, const NHttp::IResponsePtr& response) +{ + auto httpCode = response->GetStatusCode(); + if (httpCode == NHttp::EStatusCode::OK || httpCode == NHttp::EStatusCode::Accepted) { + return {}; + } + + TErrorResponse errorResponse(static_cast<int>(httpCode), requestId); + + auto logAndSetError = [&] (const TString& rawError) { + YT_LOG_ERROR("RSP %v - HTTP %v - %v", + requestId, + httpCode, + rawError.data()); + errorResponse.SetRawError(rawError); + }; + + switch (httpCode) { + case NHttp::EStatusCode::TooManyRequests: + logAndSetError("request rate limit exceeded"); + break; + + case NHttp::EStatusCode::InternalServerError: + logAndSetError("internal error in proxy " + hostName); + break; + + default: { + TStringStream httpHeaders; + httpHeaders << "HTTP headers ("; + for (const auto& [headerName, headerValue] : response->GetHeaders()->Dump()) { + httpHeaders << headerName << ": " << headerValue << "; "; + } + httpHeaders << ")"; + + auto errorString = Sprintf("RSP %s - HTTP %d - %s", + requestId.data(), + static_cast<int>(httpCode), + httpHeaders.Str().data()); + + YT_LOG_ERROR("%v", + errorString.data()); + + if (auto errorHeader = response->GetHeaders()->Find("X-YT-Error")) { + errorResponse.ParseFromJsonError(*errorHeader); + if (errorResponse.IsOk()) { + return Nothing(); + } + return errorResponse; + } + + errorResponse.SetRawError( + errorString + " - X-YT-Error is missing in headers"); + break; + } + } + + return errorResponse; +} + +void CheckErrorResponse(const TString& hostName, const TString& requestId, const NHttp::IResponsePtr& response) +{ + auto errorResponse = GetErrorResponse(hostName, requestId, response); + if (errorResponse) { + throw *errorResponse; + } +} + +} // namespace + +/////////////////////////////////////////////////////////////////////////////// + +class TDefaultHttpResponse + : public IHttpResponse +{ +public: + TDefaultHttpResponse(std::unique_ptr<THttpRequest> request) + : Request_(std::move(request)) + { } + + int GetStatusCode() override + { + return Request_->GetHttpCode(); + } + + IInputStream* GetResponseStream() override + { + return Request_->GetResponseStream(); + } + + TString GetResponse() override + { + return Request_->GetResponse(); + } + + TString GetRequestId() const override + { + return Request_->GetRequestId(); + } + +private: + std::unique_ptr<THttpRequest> Request_; +}; + +class TDefaultHttpRequest + : public IHttpRequest +{ +public: + TDefaultHttpRequest(std::unique_ptr<THttpRequest> request, IOutputStream* stream) + : Request_(std::move(request)) + , Stream_(stream) + { } + + IOutputStream* GetStream() override + { + return Stream_; + } + + IHttpResponsePtr Finish() override + { + Request_->FinishRequest(); + return std::make_unique<TDefaultHttpResponse>(std::move(Request_)); + } + +private: + std::unique_ptr<THttpRequest> Request_; + IOutputStream* Stream_; +}; + +class TDefaultHttpClient + : public IHttpClient +{ +public: + IHttpResponsePtr Request(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header, TMaybe<TStringBuf> body) override + { + auto request = std::make_unique<THttpRequest>(requestId); + + auto urlRef = NHttp::ParseUrl(url); + + request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout); + request->SmallRequest(header, body); + return std::make_unique<TDefaultHttpResponse>(std::move(request)); + } + + IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header) override + { + auto request = std::make_unique<THttpRequest>(requestId); + + auto urlRef = NHttp::ParseUrl(url); + + request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout); + auto stream = request->StartRequest(header); + return std::make_unique<TDefaultHttpRequest>(std::move(request), stream); + } +}; + +/////////////////////////////////////////////////////////////////////////////// + +struct TCoreRequestContext +{ + TString HostName; + TString Url; + TString RequestId; + bool LogResponse; + TInstant StartTime; + TString LoggedAttributes; +}; + +class TCoreHttpResponse + : public IHttpResponse +{ +public: + TCoreHttpResponse( + TCoreRequestContext context, + NHttp::IResponsePtr response) + : Context_(std::move(context)) + , Response_(std::move(response)) + { } + + int GetStatusCode() override + { + return static_cast<int>(Response_->GetStatusCode()); + } + + IInputStream* GetResponseStream() override + { + if (!Stream_) { + auto stream = std::make_unique<TWrappedStream>( + NConcurrency::CreateSyncAdapter(NConcurrency::CreateCopyingAdapter(Response_), NConcurrency::EWaitForStrategy::WaitFor), + Response_, + Context_.RequestId); + CheckErrorResponse(Context_.HostName, Context_.RequestId, Response_); + + if (TConfig::Get()->UseAbortableResponse) { + Y_VERIFY(!Context_.Url.empty()); + Stream_ = std::make_unique<TAbortableCoreHttpResponse>(std::move(stream), Context_.Url); + } else { + Stream_ = std::move(stream); + } + } + + return Stream_.get(); + } + + TString GetResponse() override + { + auto result = GetResponseStream()->ReadAll(); + + TStringStream loggedAttributes; + loggedAttributes + << "Time: " << TInstant::Now() - Context_.StartTime << "; " + << "HostName: " << Context_.HostName << "; " + << Context_.LoggedAttributes; + + if (Context_.LogResponse) { + constexpr auto sizeLimit = 1 << 7; + YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)", + Context_.RequestId, + TruncateForLogs(result, sizeLimit), + loggedAttributes.Str()); + } else { + YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)", + Context_.RequestId, + result.size(), + loggedAttributes.Str()); + } + return result; + } + + TString GetRequestId() const override + { + return Context_.RequestId; + } + +private: + class TWrappedStream + : public IInputStream + { + public: + TWrappedStream(std::unique_ptr<IInputStream> underlying, NHttp::IResponsePtr response, TString requestId) + : Underlying_(std::move(underlying)) + , Response_(std::move(response)) + , RequestId_(std::move(requestId)) + { } + + protected: + size_t DoRead(void* buf, size_t len) override + { + size_t read = Underlying_->Read(buf, len); + + if (read == 0 && len != 0) { + CheckTrailers(Response_->GetTrailers()); + } + return read; + } + + size_t DoSkip(size_t len) override + { + size_t skipped = Underlying_->Skip(len); + if (skipped == 0 && len != 0) { + CheckTrailers(Response_->GetTrailers()); + } + return skipped; + } + + private: + void CheckTrailers(const NHttp::THeadersPtr& trailers) + { + if (auto errorResponse = ParseError(trailers)) { + errorResponse->SetIsFromTrailers(true); + YT_LOG_ERROR("RSP %v - %v", + RequestId_, + errorResponse.GetRef().what()); + ythrow errorResponse.GetRef(); + } + } + + TMaybe<TErrorResponse> ParseError(const NHttp::THeadersPtr& headers) + { + if (auto errorHeader = headers->Find("X-YT-Error")) { + TErrorResponse errorResponse(static_cast<int>(Response_->GetStatusCode()), RequestId_); + errorResponse.ParseFromJsonError(*errorHeader); + if (errorResponse.IsOk()) { + return Nothing(); + } + return errorResponse; + } + return Nothing(); + } + + private: + std::unique_ptr<IInputStream> Underlying_; + NHttp::IResponsePtr Response_; + TString RequestId_; + }; + +private: + TCoreRequestContext Context_; + NHttp::IResponsePtr Response_; + std::unique_ptr<IInputStream> Stream_; +}; + +class TCoreHttpRequest + : public IHttpRequest +{ +public: + TCoreHttpRequest(TCoreRequestContext context, NHttp::IActiveRequestPtr activeRequest) + : Context_(std::move(context)) + , ActiveRequest_(std::move(activeRequest)) + , Stream_(NConcurrency::CreateBufferedSyncAdapter(ActiveRequest_->GetRequestStream())) + , WrappedStream_(this, Stream_.get()) + { } + + IOutputStream* GetStream() override + { + return &WrappedStream_; + } + + IHttpResponsePtr Finish() override + { + WrappedStream_.Flush(); + auto response = ActiveRequest_->Finish().Get().ValueOrThrow(); + return std::make_unique<TCoreHttpResponse>(std::move(Context_), std::move(response)); + } + + IHttpResponsePtr FinishWithError() + { + auto response = ActiveRequest_->GetResponse(); + return std::make_unique<TCoreHttpResponse>(std::move(Context_), std::move(response)); + } + +private: + class TWrappedStream + : public IOutputStream + { + public: + TWrappedStream(TCoreHttpRequest* httpRequest, IOutputStream* underlying) + : HttpRequest_(httpRequest) + , Underlying_(underlying) + { } + + private: + void DoWrite(const void* buf, size_t len) override + { + WrapWriteFunc([&] { + Underlying_->Write(buf, len); + }); + } + + void DoWriteV(const TPart* parts, size_t count) override + { + WrapWriteFunc([&] { + Underlying_->Write(parts, count); + }); + } + + void DoWriteC(char ch) override + { + WrapWriteFunc([&] { + Underlying_->Write(ch); + }); + } + + void DoFlush() override + { + WrapWriteFunc([&] { + Underlying_->Flush(); + }); + } + + void DoFinish() override + { + WrapWriteFunc([&] { + Underlying_->Finish(); + }); + } + + void WrapWriteFunc(std::function<void()> func) + { + CheckErrorState(); + try { + func(); + } catch (const std::exception&) { + HandleWriteException(); + } + } + + // In many cases http proxy stops reading request and resets connection + // if error has happend. This function tries to read error response + // in such cases. + void HandleWriteException() { + Y_VERIFY(WriteError_ == nullptr); + WriteError_ = std::current_exception(); + Y_VERIFY(WriteError_ != nullptr); + try { + HttpRequest_->FinishWithError()->GetResponseStream(); + } catch (const TErrorResponse &) { + throw; + } catch (...) { + } + std::rethrow_exception(WriteError_); + } + + void CheckErrorState() + { + if (WriteError_) { + std::rethrow_exception(WriteError_); + } + } + + private: + TCoreHttpRequest* const HttpRequest_; + IOutputStream* Underlying_; + std::exception_ptr WriteError_; + }; + +private: + TCoreRequestContext Context_; + NHttp::IActiveRequestPtr ActiveRequest_; + std::unique_ptr<IOutputStream> Stream_; + TWrappedStream WrappedStream_; +}; + +class TCoreHttpClient + : public IHttpClient +{ +public: + TCoreHttpClient(bool useTLS, const TConfigPtr& config) + : Poller_(NConcurrency::CreateThreadPoolPoller(1, "http_poller")) // TODO(nadya73): YT-18363: move threads count to config + { + if (useTLS) { + auto httpsConfig = NYT::New<NYT::NHttps::TClientConfig>(); + httpsConfig->MaxIdleConnections = config->ConnectionPoolSize; + Client_ = NHttps::CreateClient(httpsConfig, Poller_); + } else { + auto httpConfig = NYT::New<NYT::NHttp::TClientConfig>(); + httpConfig->MaxIdleConnections = config->ConnectionPoolSize; + Client_ = NHttp::CreateClient(httpConfig, Poller_); + } + } + + IHttpResponsePtr Request(const TString& url, const TString& requestId, const THttpConfig& /*config*/, const THttpHeader& header, TMaybe<TStringBuf> body) override + { + TCoreRequestContext context = CreateContext(url, requestId, header); + + // TODO(nadya73): YT-18363: pass socket timeouts from THttpConfig + + NHttp::IResponsePtr response; + + auto logRequest = [&](bool includeParameters) { + LogRequest(header, url, includeParameters, requestId, context.HostName); + context.LoggedAttributes = GetLoggedAttributes(header, url, includeParameters, 128); + }; + + if (!body && (header.GetMethod() == "PUT" || header.GetMethod() == "POST")) { + const auto& parameters = header.GetParameters(); + auto parametersStr = NodeToYsonString(parameters); + + bool includeParameters = false; + auto headers = header.GetHeader(context.HostName, requestId, includeParameters).Get(); + + logRequest(includeParameters); + + auto activeRequest = StartRequestImpl(header.GetMethod(), url, headers); + + activeRequest->GetRequestStream()->Write(TSharedRef::FromString(parametersStr)).Get().ThrowOnError(); + response = activeRequest->Finish().Get().ValueOrThrow(); + } else { + auto bodyRef = TSharedRef::FromString(TString(body ? *body : "")); + bool includeParameters = true; + auto headers = header.GetHeader(context.HostName, requestId, includeParameters).Get(); + + logRequest(includeParameters); + + if (header.GetMethod() == "GET") { + response = RequestImpl(header.GetMethod(), url, headers, bodyRef); + } else { + auto activeRequest = StartRequestImpl(header.GetMethod(), url, headers); + + auto request = std::make_unique<TCoreHttpRequest>(std::move(context), std::move(activeRequest)); + if (body) { + request->GetStream()->Write(*body); + } + return request->Finish(); + } + } + + return std::make_unique<TCoreHttpResponse>(std::move(context), std::move(response)); + } + + IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpConfig& /*config*/, const THttpHeader& header) override + { + TCoreRequestContext context = CreateContext(url, requestId, header); + + LogRequest(header, url, true, requestId, context.HostName); + context.LoggedAttributes = GetLoggedAttributes(header, url, true, 128); + + auto headers = header.GetHeader(context.HostName, requestId, true).Get(); + auto activeRequest = StartRequestImpl(header.GetMethod(), url, headers); + + return std::make_unique<TCoreHttpRequest>(std::move(context), std::move(activeRequest)); + } + +private: + TCoreRequestContext CreateContext(const TString& url, const TString& requestId, const THttpHeader& header) + { + TCoreRequestContext context; + context.Url = url; + context.RequestId = requestId; + + auto urlRef = NHttp::ParseUrl(url); + context.HostName = CreateHost(urlRef.Host, urlRef.PortStr); + + context.LogResponse = false; + auto outputFormat = header.GetOutputFormat(); + if (outputFormat && outputFormat->IsTextYson()) { + context.LogResponse = true; + } + context.StartTime = TInstant::Now(); + return context; + } + + NHttp::IResponsePtr RequestImpl(const TString& method, const TString& url, const NHttp::THeadersPtr& headers, const TSharedRef& body) + { + if (method == "GET") { + return Client_->Get(url, headers).Get().ValueOrThrow(); + } else if (method == "POST") { + return Client_->Post(url, body, headers).Get().ValueOrThrow(); + } else if (method == "PUT") { + return Client_->Put(url, body, headers).Get().ValueOrThrow(); + } else { + YT_LOG_FATAL("Unsupported http method (Method: %v, Url: %v)", + method, + url); + } + } + + NHttp::IActiveRequestPtr StartRequestImpl(const TString& method, const TString& url, const NHttp::THeadersPtr& headers) + { + if (method == "POST") { + return Client_->StartPost(url, headers).Get().ValueOrThrow(); + } else if (method == "PUT") { + return Client_->StartPut(url, headers).Get().ValueOrThrow(); + } else { + YT_LOG_FATAL("Unsupported http method (Method: %v, Url: %v)", + method, + url); + } + } + + NConcurrency::IThreadPoolPollerPtr Poller_; + NHttp::IClientPtr Client_; +}; + +/////////////////////////////////////////////////////////////////////////////// + +IHttpClientPtr CreateDefaultHttpClient() +{ + return std::make_shared<TDefaultHttpClient>(); +} + +IHttpClientPtr CreateCoreHttpClient(bool useTLS, const TConfigPtr& config) +{ + return std::make_shared<TCoreHttpClient>(useTLS, config); +} + +/////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NHttpClient diff --git a/yt/cpp/mapreduce/http/http_client.h b/yt/cpp/mapreduce/http/http_client.h new file mode 100644 index 00000000000..859f0423cb4 --- /dev/null +++ b/yt/cpp/mapreduce/http/http_client.h @@ -0,0 +1,76 @@ +#pragma once + +#include "fwd.h" + +#include <yt/cpp/mapreduce/interface/fwd.h> + +#include <util/datetime/base.h> + +#include <util/generic/maybe.h> +#include <util/generic/string.h> + +#include <util/stream/fwd.h> + +#include <memory> + +namespace NYT::NHttpClient { + +/////////////////////////////////////////////////////////////////////////////// + +struct THttpConfig +{ + TDuration SocketTimeout = TDuration::Zero(); +}; + +/////////////////////////////////////////////////////////////////////////////// + +class IHttpResponse +{ +public: + virtual ~IHttpResponse() = default; + + virtual int GetStatusCode() = 0; + virtual IInputStream* GetResponseStream() = 0; + virtual TString GetResponse() = 0; + virtual TString GetRequestId() const = 0; +}; + +class IHttpRequest +{ +public: + virtual ~IHttpRequest() = default; + + virtual IOutputStream* GetStream() = 0; + virtual IHttpResponsePtr Finish() = 0; +}; + + +class IHttpClient +{ +public: + virtual ~IHttpClient() = default; + + virtual IHttpResponsePtr Request(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header, TMaybe<TStringBuf> body = {}) = 0; + + virtual IHttpResponsePtr Request(const TString& url, const TString& requestId, const THttpHeader& header, TMaybe<TStringBuf> body = {}) + { + return Request(url, requestId, /*config*/ {}, header, body); + } + + virtual IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header) = 0; + + virtual IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpHeader& header) + { + return StartRequest(url, requestId, /*config*/ {}, header); + } +}; + +/////////////////////////////////////////////////////////////////////////////// + +IHttpClientPtr CreateDefaultHttpClient(); + +IHttpClientPtr CreateCoreHttpClient(bool useTLS, const TConfigPtr& config); + +/////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NHttpClient diff --git a/yt/cpp/mapreduce/http/requests.cpp b/yt/cpp/mapreduce/http/requests.cpp new file mode 100644 index 00000000000..7cf0f673bb4 --- /dev/null +++ b/yt/cpp/mapreduce/http/requests.cpp @@ -0,0 +1,66 @@ +#include "requests.h" + +#include "context.h" +#include "host_manager.h" +#include "retry_request.h" + +#include <yt/cpp/mapreduce/client/transaction.h> + +#include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/cpp/mapreduce/common/retry_lib.h> +#include <yt/cpp/mapreduce/common/node_builder.h> +#include <yt/cpp/mapreduce/common/wait_proxy.h> + +#include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/errors.h> +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> +#include <yt/cpp/mapreduce/interface/serialize.h> + +#include <util/stream/file.h> +#include <util/string/builder.h> +#include <util/generic/buffer.h> + + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +bool ParseBoolFromResponse(const TString& response) +{ + return GetBool(NodeFromYsonString(response)); +} + +TGUID ParseGuidFromResponse(const TString& response) +{ + auto node = NodeFromYsonString(response); + return GetGuid(node.AsString()); +} + +//////////////////////////////////////////////////////////////////////////////// + +TString GetProxyForHeavyRequest(const TClientContext& context) +{ + if (!context.Config->UseHosts) { + return context.ServerName; + } + + return NPrivate::THostManager::Get().GetProxyForHeavyRequest(context); +} + +void LogRequestError( + const TString& requestId, + const THttpHeader& header, + const TString& message, + const TString& attemptDescription) +{ + YT_LOG_ERROR("RSP %v - %v - %v - %v - X-YT-Parameters: %v", + requestId, + header.GetUrl(), + message, + attemptDescription, + NodeToYsonString(header.GetParameters())); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/http/requests.h b/yt/cpp/mapreduce/http/requests.h new file mode 100644 index 00000000000..2c692475d1c --- /dev/null +++ b/yt/cpp/mapreduce/http/requests.h @@ -0,0 +1,29 @@ +#pragma once + +#include "fwd.h" +#include "http.h" + +#include <util/generic/maybe.h> +#include <util/str_stl.h> + +namespace NYT { + +/////////////////////////////////////////////////////////////////////////////// + +bool ParseBoolFromResponse(const TString& response); + +TGUID ParseGuidFromResponse(const TString& response); + +//////////////////////////////////////////////////////////////////////////////// + +TString GetProxyForHeavyRequest(const TClientContext& context); + +void LogRequestError( + const TString& requestId, + const THttpHeader& header, + const TString& message, + const TString& attemptDescription); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/http/retry_request.cpp b/yt/cpp/mapreduce/http/retry_request.cpp new file mode 100644 index 00000000000..ba116edcf7a --- /dev/null +++ b/yt/cpp/mapreduce/http/retry_request.cpp @@ -0,0 +1,149 @@ +#include "retry_request.h" + +#include "context.h" +#include "helpers.h" +#include "http_client.h" +#include "requests.h" + +#include <yt/cpp/mapreduce/common/wait_proxy.h> +#include <yt/cpp/mapreduce/common/retry_lib.h> + +#include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/tvm.h> + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <library/cpp/yson/node/node_io.h> + +namespace NYT { +namespace NDetail { + +/////////////////////////////////////////////////////////////////////////////// + +static TResponseInfo Request( + const TClientContext& context, + THttpHeader& header, + TMaybe<TStringBuf> body, + const TString& requestId, + const TRequestConfig& config) +{ + TString hostName; + if (config.IsHeavy) { + hostName = GetProxyForHeavyRequest(context); + } else { + hostName = context.ServerName; + } + + auto url = GetFullUrl(hostName, context, header); + + auto response = context.HttpClient->Request(url, requestId, config.HttpConfig, header, body); + + TResponseInfo result; + result.RequestId = requestId; + result.Response = response->GetResponse(); + result.HttpCode = response->GetStatusCode(); + return result; +} + +TResponseInfo RequestWithoutRetry( + const TClientContext& context, + THttpHeader& header, + TMaybe<TStringBuf> body, + const TRequestConfig& config) +{ + if (context.ServiceTicketAuth) { + header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket()); + } else { + header.SetToken(context.Token); + } + + if (context.ImpersonationUser) { + header.SetImpersonationUser(*context.ImpersonationUser); + } + + if (header.HasMutationId()) { + header.RemoveParameter("retry"); + header.AddMutationId(); + } + auto requestId = CreateGuidAsString(); + return Request(context, header, body, requestId, config); +} + + +TResponseInfo RetryRequestWithPolicy( + IRequestRetryPolicyPtr retryPolicy, + const TClientContext& context, + THttpHeader& header, + TMaybe<TStringBuf> body, + const TRequestConfig& config) +{ + if (context.ServiceTicketAuth) { + header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket()); + } else { + header.SetToken(context.Token); + } + + if (context.ImpersonationUser) { + header.SetImpersonationUser(*context.ImpersonationUser); + } + + bool useMutationId = header.HasMutationId(); + bool retryWithSameMutationId = false; + + if (!retryPolicy) { + retryPolicy = CreateDefaultRequestRetryPolicy(context.Config); + } + + while (true) { + auto requestId = CreateGuidAsString(); + try { + retryPolicy->NotifyNewAttempt(); + + if (useMutationId) { + if (retryWithSameMutationId) { + header.AddParameter("retry", true, /* overwrite = */ true); + } else { + header.RemoveParameter("retry"); + header.AddMutationId(); + } + } + + return Request(context, header, body, requestId, config); + } catch (const TErrorResponse& e) { + LogRequestError(requestId, header, e.GetError().GetMessage(), retryPolicy->GetAttemptDescription()); + retryWithSameMutationId = e.IsTransportError(); + + if (!IsRetriable(e)) { + throw; + } + + auto maybeRetryTimeout = retryPolicy->OnRetriableError(e); + if (maybeRetryTimeout) { + TWaitProxy::Get()->Sleep(*maybeRetryTimeout); + } else { + throw; + } + } catch (const std::exception& e) { + LogRequestError(requestId, header, e.what(), retryPolicy->GetAttemptDescription()); + retryWithSameMutationId = true; + + if (!IsRetriable(e)) { + throw; + } + + auto maybeRetryTimeout = retryPolicy->OnGenericError(e); + if (maybeRetryTimeout) { + TWaitProxy::Get()->Sleep(*maybeRetryTimeout); + } else { + throw; + } + } + } + + Y_FAIL("Retries must have either succeeded or thrown an exception"); +} + +/////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail +} // namespace NYT diff --git a/yt/cpp/mapreduce/http/retry_request.h b/yt/cpp/mapreduce/http/retry_request.h new file mode 100644 index 00000000000..2210e318f10 --- /dev/null +++ b/yt/cpp/mapreduce/http/retry_request.h @@ -0,0 +1,52 @@ +#pragma once + +#include "fwd.h" + +#include <yt/cpp/mapreduce/interface/fwd.h> +#include <yt/cpp/mapreduce/common/fwd.h> + +#include <yt/cpp/mapreduce/http/http_client.h> + +#include <util/datetime/base.h> +#include <util/generic/maybe.h> +#include <util/generic/string.h> + +namespace NYT::NDetail { + +//////////////////////////////////////////////////////////////////// + +struct TResponseInfo +{ + TString RequestId; + TString Response; + int HttpCode = 0; +}; + +//////////////////////////////////////////////////////////////////// + +struct TRequestConfig +{ + NHttpClient::THttpConfig HttpConfig; + bool IsHeavy = false; +}; + +//////////////////////////////////////////////////////////////////// + +// Retry request with given `header' and `body' using `retryPolicy'. +// If `retryPolicy == nullptr' use default, currently `TAttemptLimitedRetryPolicy(TConfig::Get()->RetryCount)`. +TResponseInfo RetryRequestWithPolicy( + IRequestRetryPolicyPtr retryPolicy, + const TClientContext& context, + THttpHeader& header, + TMaybe<TStringBuf> body = {}, + const TRequestConfig& config = TRequestConfig()); + +TResponseInfo RequestWithoutRetry( + const TClientContext& context, + THttpHeader& header, + TMaybe<TStringBuf> body = {}, + const TRequestConfig& config = TRequestConfig()); + +//////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/http/ya.make b/yt/cpp/mapreduce/http/ya.make new file mode 100644 index 00000000000..ef81a4b64a7 --- /dev/null +++ b/yt/cpp/mapreduce/http/ya.make @@ -0,0 +1,29 @@ +LIBRARY() + +INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) + +SRCS( + abortable_http_response.cpp + context.cpp + helpers.cpp + host_manager.cpp + http.cpp + http_client.cpp + requests.cpp + retry_request.cpp +) + +PEERDIR( + library/cpp/deprecated/atomic + library/cpp/http/io + library/cpp/string_utils/base64 + library/cpp/string_utils/quote + library/cpp/threading/cron + yt/cpp/mapreduce/common + yt/cpp/mapreduce/interface + yt/cpp/mapreduce/interface/logging + yt/yt/core/http + yt/yt/core/https +) + +END() |