aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/http
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
committermax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
commit73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch)
tree188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/http
parent528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff)
downloadydb-73b89de71748a21e102d27b9f3ed1bf658766cb5.tar.gz
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build. Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/http')
-rw-r--r--yt/cpp/mapreduce/http/CMakeLists.darwin-x86_64.txt37
-rw-r--r--yt/cpp/mapreduce/http/CMakeLists.linux-aarch64.txt38
-rw-r--r--yt/cpp/mapreduce/http/CMakeLists.linux-x86_64.txt38
-rw-r--r--yt/cpp/mapreduce/http/CMakeLists.txt17
-rw-r--r--yt/cpp/mapreduce/http/CMakeLists.windows-x86_64.txt34
-rw-r--r--yt/cpp/mapreduce/http/abortable_http_response.cpp223
-rw-r--r--yt/cpp/mapreduce/http/abortable_http_response.h142
-rw-r--r--yt/cpp/mapreduce/http/context.cpp25
-rw-r--r--yt/cpp/mapreduce/http/context.h31
-rw-r--r--yt/cpp/mapreduce/http/core.h27
-rw-r--r--yt/cpp/mapreduce/http/fwd.h26
-rw-r--r--yt/cpp/mapreduce/http/helpers.cpp88
-rw-r--r--yt/cpp/mapreduce/http/helpers.h25
-rw-r--r--yt/cpp/mapreduce/http/host_manager.cpp140
-rw-r--r--yt/cpp/mapreduce/http/host_manager.h37
-rw-r--r--yt/cpp/mapreduce/http/http.cpp1014
-rw-r--r--yt/cpp/mapreduce/http/http.h256
-rw-r--r--yt/cpp/mapreduce/http/http_client.cpp603
-rw-r--r--yt/cpp/mapreduce/http/http_client.h76
-rw-r--r--yt/cpp/mapreduce/http/requests.cpp66
-rw-r--r--yt/cpp/mapreduce/http/requests.h29
-rw-r--r--yt/cpp/mapreduce/http/retry_request.cpp149
-rw-r--r--yt/cpp/mapreduce/http/retry_request.h52
-rw-r--r--yt/cpp/mapreduce/http/ya.make29
24 files changed, 3202 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/http/CMakeLists.darwin-x86_64.txt b/yt/cpp/mapreduce/http/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..8212c9d8d16
--- /dev/null
+++ b/yt/cpp/mapreduce/http/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,37 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-mapreduce-http)
+target_compile_options(cpp-mapreduce-http PRIVATE
+ -Wdeprecated-this-capture
+)
+target_link_libraries(cpp-mapreduce-http PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-deprecated-atomic
+ cpp-http-io
+ cpp-string_utils-base64
+ cpp-string_utils-quote
+ cpp-threading-cron
+ cpp-mapreduce-common
+ cpp-mapreduce-interface
+ mapreduce-interface-logging
+ yt-core-http
+ yt-core-https
+)
+target_sources(cpp-mapreduce-http PRIVATE
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/abortable_http_response.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/context.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/helpers.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/host_manager.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/http.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/http_client.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/requests.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/retry_request.cpp
+)
diff --git a/yt/cpp/mapreduce/http/CMakeLists.linux-aarch64.txt b/yt/cpp/mapreduce/http/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..b1993829102
--- /dev/null
+++ b/yt/cpp/mapreduce/http/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,38 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-mapreduce-http)
+target_compile_options(cpp-mapreduce-http PRIVATE
+ -Wdeprecated-this-capture
+)
+target_link_libraries(cpp-mapreduce-http PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-deprecated-atomic
+ cpp-http-io
+ cpp-string_utils-base64
+ cpp-string_utils-quote
+ cpp-threading-cron
+ cpp-mapreduce-common
+ cpp-mapreduce-interface
+ mapreduce-interface-logging
+ yt-core-http
+ yt-core-https
+)
+target_sources(cpp-mapreduce-http PRIVATE
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/abortable_http_response.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/context.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/helpers.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/host_manager.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/http.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/http_client.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/requests.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/retry_request.cpp
+)
diff --git a/yt/cpp/mapreduce/http/CMakeLists.linux-x86_64.txt b/yt/cpp/mapreduce/http/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..b1993829102
--- /dev/null
+++ b/yt/cpp/mapreduce/http/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,38 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-mapreduce-http)
+target_compile_options(cpp-mapreduce-http PRIVATE
+ -Wdeprecated-this-capture
+)
+target_link_libraries(cpp-mapreduce-http PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-deprecated-atomic
+ cpp-http-io
+ cpp-string_utils-base64
+ cpp-string_utils-quote
+ cpp-threading-cron
+ cpp-mapreduce-common
+ cpp-mapreduce-interface
+ mapreduce-interface-logging
+ yt-core-http
+ yt-core-https
+)
+target_sources(cpp-mapreduce-http PRIVATE
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/abortable_http_response.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/context.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/helpers.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/host_manager.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/http.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/http_client.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/requests.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/retry_request.cpp
+)
diff --git a/yt/cpp/mapreduce/http/CMakeLists.txt b/yt/cpp/mapreduce/http/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/yt/cpp/mapreduce/http/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/yt/cpp/mapreduce/http/CMakeLists.windows-x86_64.txt b/yt/cpp/mapreduce/http/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..b742e11f63c
--- /dev/null
+++ b/yt/cpp/mapreduce/http/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,34 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-mapreduce-http)
+target_link_libraries(cpp-mapreduce-http PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-deprecated-atomic
+ cpp-http-io
+ cpp-string_utils-base64
+ cpp-string_utils-quote
+ cpp-threading-cron
+ cpp-mapreduce-common
+ cpp-mapreduce-interface
+ mapreduce-interface-logging
+ yt-core-http
+ yt-core-https
+)
+target_sources(cpp-mapreduce-http PRIVATE
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/abortable_http_response.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/context.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/helpers.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/host_manager.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/http.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/http_client.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/requests.cpp
+ ${CMAKE_SOURCE_DIR}/yt/cpp/mapreduce/http/retry_request.cpp
+)
diff --git a/yt/cpp/mapreduce/http/abortable_http_response.cpp b/yt/cpp/mapreduce/http/abortable_http_response.cpp
new file mode 100644
index 00000000000..9da9241d337
--- /dev/null
+++ b/yt/cpp/mapreduce/http/abortable_http_response.cpp
@@ -0,0 +1,223 @@
+#include "abortable_http_response.h"
+
+#include <util/system/mutex.h>
+#include <util/generic/singleton.h>
+#include <util/generic/hash_set.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TAbortableHttpResponseRegistry {
+public:
+ TOutageId StartOutage(TString urlPattern, const TOutageOptions& options)
+ {
+ auto g = Guard(Lock_);
+ auto id = NextId_++;
+ IdToOutage.emplace(id, TOutageEntry{std::move(urlPattern), options.ResponseCount_, options.LengthLimit_});
+ return id;
+ }
+
+ void StopOutage(TOutageId id)
+ {
+ auto g = Guard(Lock_);
+ IdToOutage.erase(id);
+ }
+
+ void Add(IAbortableHttpResponse* response)
+ {
+ auto g = Guard(Lock_);
+ for (auto& [id, entry] : IdToOutage) {
+ if (entry.Counter > 0 && response->GetUrl().find(entry.Pattern) != TString::npos) {
+ response->SetLengthLimit(entry.LengthLimit);
+ entry.Counter -= 1;
+ }
+ }
+ ResponseList_.PushBack(response);
+ }
+
+ void Remove(IAbortableHttpResponse* response)
+ {
+ auto g = Guard(Lock_);
+ response->Unlink();
+ }
+
+ static TAbortableHttpResponseRegistry& Get()
+ {
+ return *Singleton<TAbortableHttpResponseRegistry>();
+ }
+
+ int AbortAll(const TString& urlPattern)
+ {
+ int result = 0;
+ for (auto& response : ResponseList_) {
+ if (!response.IsAborted() && response.GetUrl().find(urlPattern) != TString::npos) {
+ response.Abort();
+ ++result;
+ }
+ }
+ return result;
+ }
+
+private:
+ struct TOutageEntry
+ {
+ TString Pattern;
+ size_t Counter;
+ size_t LengthLimit;
+ };
+
+private:
+ TOutageId NextId_ = 0;
+ TIntrusiveList<IAbortableHttpResponse> ResponseList_;
+ THashMap<TOutageId, TOutageEntry> IdToOutage;
+ TMutex Lock_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+TAbortableHttpResponse::TOutage::TOutage(
+ TString urlPattern,
+ TAbortableHttpResponseRegistry& registry,
+ const TOutageOptions& options)
+ : UrlPattern_(std::move(urlPattern))
+ , Registry_(registry)
+ , Id_(registry.StartOutage(UrlPattern_, options))
+{ }
+
+TAbortableHttpResponse::TOutage::~TOutage()
+{
+ Stop();
+}
+
+void TAbortableHttpResponse::TOutage::Stop()
+{
+ if (!Stopped_) {
+ Registry_.StopOutage(Id_);
+ Stopped_ = true;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TAbortableHttpResponseBase::TAbortableHttpResponseBase(const TString& url)
+ : Url_(url)
+{
+ TAbortableHttpResponseRegistry::Get().Add(this);
+}
+
+TAbortableHttpResponseBase::~TAbortableHttpResponseBase()
+{
+ TAbortableHttpResponseRegistry::Get().Remove(this);
+}
+
+void TAbortableHttpResponseBase::Abort()
+{
+ Aborted_ = true;
+}
+
+void TAbortableHttpResponseBase::SetLengthLimit(size_t limit)
+{
+ LengthLimit_ = limit;
+ if (LengthLimit_ == 0) {
+ Abort();
+ }
+}
+
+const TString& TAbortableHttpResponseBase::GetUrl() const
+{
+ return Url_;
+}
+
+bool TAbortableHttpResponseBase::IsAborted() const
+{
+ return Aborted_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TAbortableHttpResponse::TAbortableHttpResponse(
+ IInputStream* socketStream,
+ const TString& requestId,
+ const TString& hostName,
+ const TString& url)
+ : THttpResponse(socketStream, requestId, hostName)
+ , TAbortableHttpResponseBase(url)
+{
+}
+
+size_t TAbortableHttpResponse::DoRead(void* buf, size_t len)
+{
+ if (Aborted_) {
+ ythrow TAbortedForTestPurpose() << "response was aborted";
+ }
+ len = std::min(len, LengthLimit_);
+ auto read = THttpResponse::DoRead(buf, len);
+ LengthLimit_ -= read;
+ if (LengthLimit_ == 0) {
+ Abort();
+ }
+ return read;
+}
+
+size_t TAbortableHttpResponse::DoSkip(size_t len)
+{
+ if (Aborted_) {
+ ythrow TAbortedForTestPurpose() << "response was aborted";
+ }
+ return THttpResponse::DoSkip(len);
+}
+
+int TAbortableHttpResponse::AbortAll(const TString& urlPattern)
+{
+ return TAbortableHttpResponseRegistry::Get().AbortAll(urlPattern);
+}
+
+TAbortableHttpResponse::TOutage TAbortableHttpResponse::StartOutage(
+ const TString& urlPattern,
+ const TOutageOptions& options)
+{
+ return TOutage(urlPattern, TAbortableHttpResponseRegistry::Get(), options);
+}
+
+TAbortableHttpResponse::TOutage TAbortableHttpResponse::StartOutage(
+ const TString& urlPattern,
+ size_t responseCount)
+{
+ return StartOutage(urlPattern, TOutageOptions().ResponseCount(responseCount));
+}
+
+TAbortableCoreHttpResponse::TAbortableCoreHttpResponse(
+ std::unique_ptr<IInputStream> stream,
+ const TString& url)
+ : TAbortableHttpResponseBase(url)
+ , Stream_(std::move(stream))
+{
+}
+
+size_t TAbortableCoreHttpResponse::DoRead(void* buf, size_t len)
+{
+ if (Aborted_) {
+ ythrow TAbortedForTestPurpose() << "response was aborted";
+ }
+ len = std::min(len, LengthLimit_);
+ auto read = Stream_->Read(buf, len);
+ LengthLimit_ -= read;
+ if (LengthLimit_ == 0) {
+ Abort();
+ }
+
+ return read;
+}
+
+size_t TAbortableCoreHttpResponse::DoSkip(size_t len)
+{
+ if (Aborted_) {
+ ythrow TAbortedForTestPurpose() << "response was aborted";
+ }
+ return Stream_->Skip(len);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/http/abortable_http_response.h b/yt/cpp/mapreduce/http/abortable_http_response.h
new file mode 100644
index 00000000000..d72bcfa0a69
--- /dev/null
+++ b/yt/cpp/mapreduce/http/abortable_http_response.h
@@ -0,0 +1,142 @@
+#pragma once
+
+#include "http.h"
+
+#include <util/generic/intrlist.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TAbortableHttpResponseRegistry;
+
+using TOutageId = size_t;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TAbortedForTestPurpose
+ : public yexception
+{ };
+
+struct TOutageOptions
+{
+ using TSelf = TOutageOptions;
+
+ /// @brief Number of responses to abort.
+ FLUENT_FIELD_DEFAULT(size_t, ResponseCount, std::numeric_limits<size_t>::max());
+
+ /// @brief Number of bytes to read before abortion. If zero, abort immediately.
+ FLUENT_FIELD_DEFAULT(size_t, LengthLimit, 0);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class IAbortableHttpResponse
+ : public TIntrusiveListItem<IAbortableHttpResponse>
+{
+public:
+ virtual void Abort() = 0;
+ virtual const TString& GetUrl() const = 0;
+ virtual bool IsAborted() const = 0;
+ virtual void SetLengthLimit(size_t limit) = 0;
+
+ virtual ~IAbortableHttpResponse() = default;
+};
+
+class TAbortableHttpResponseBase
+ : public IAbortableHttpResponse
+{
+public:
+ TAbortableHttpResponseBase(const TString& url);
+ ~TAbortableHttpResponseBase();
+
+ void Abort() override;
+ const TString& GetUrl() const override;
+ bool IsAborted() const override;
+ void SetLengthLimit(size_t limit) override;
+
+protected:
+ TString Url_;
+ std::atomic<bool> Aborted_ = {false};
+ size_t LengthLimit_ = std::numeric_limits<size_t>::max();
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+/// @brief Stream wrapper for @ref NYT::NHttpClient::TCoreHttpResponse with possibility to emulate errors.
+class TAbortableCoreHttpResponse
+ : public IInputStream
+ , public TAbortableHttpResponseBase
+{
+public:
+ TAbortableCoreHttpResponse(
+ std::unique_ptr<IInputStream> stream,
+ const TString& url);
+
+private:
+ size_t DoRead(void* buf, size_t len) override;
+ size_t DoSkip(size_t len) override;
+
+private:
+ std::unique_ptr<IInputStream> Stream_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+/// @brief Class extends @ref NYT::THttpResponse with possibility to emulate errors.
+class TAbortableHttpResponse
+ : public THttpResponse
+ , public TAbortableHttpResponseBase
+{
+public:
+ class TOutage
+ {
+ public:
+ TOutage(TString urlPattern, TAbortableHttpResponseRegistry& registry, const TOutageOptions& options);
+ TOutage(TOutage&&) = default;
+ TOutage(const TOutage&) = delete;
+ ~TOutage();
+
+ void Stop();
+
+ private:
+ TString UrlPattern_;
+ TAbortableHttpResponseRegistry& Registry_;
+ TOutageId Id_;
+ bool Stopped_ = false;
+ };
+
+public:
+ TAbortableHttpResponse(
+ IInputStream* socketStream,
+ const TString& requestId,
+ const TString& hostName,
+ const TString& url);
+
+ /// @brief Abort any responses which match `urlPattern` (i.e. contain it in url).
+ ///
+ /// @return number of aborted responses.
+ static int AbortAll(const TString& urlPattern);
+
+ /// @brief Start outage. Future responses which match `urlPattern` (i.e. contain it in url) will fail.
+ ///
+ /// @return outage object controlling the lifetime of outage (outage stops when object is destroyed)
+ [[nodiscard]] static TOutage StartOutage(
+ const TString& urlPattern,
+ const TOutageOptions& options = TOutageOptions());
+
+ /// @brief Start outage. Future `responseCount` responses which match `urlPattern` (i.e. contain it in url) will fail.
+ ///
+ /// @return outage object controlling the lifetime of outage (outage stops when object is destroyed)
+ [[nodiscard]] static TOutage StartOutage(
+ const TString& urlPattern,
+ size_t responseCount);
+
+private:
+ size_t DoRead(void* buf, size_t len) override;
+ size_t DoSkip(size_t len) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/http/context.cpp b/yt/cpp/mapreduce/http/context.cpp
new file mode 100644
index 00000000000..1c016263c51
--- /dev/null
+++ b/yt/cpp/mapreduce/http/context.cpp
@@ -0,0 +1,25 @@
+#include "context.h"
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+bool operator==(const TClientContext& lhs, const TClientContext& rhs)
+{
+ return lhs.ServerName == rhs.ServerName &&
+ lhs.Token == rhs.Token &&
+ lhs.ImpersonationUser == rhs.ImpersonationUser &&
+ lhs.ServiceTicketAuth == rhs.ServiceTicketAuth &&
+ lhs.HttpClient == rhs.HttpClient &&
+ lhs.UseTLS == rhs.UseTLS &&
+ lhs.TvmOnly == rhs.TvmOnly;
+}
+
+bool operator!=(const TClientContext& lhs, const TClientContext& rhs)
+{
+ return !(rhs == lhs);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/http/context.h b/yt/cpp/mapreduce/http/context.h
new file mode 100644
index 00000000000..3926373e174
--- /dev/null
+++ b/yt/cpp/mapreduce/http/context.h
@@ -0,0 +1,31 @@
+#pragma once
+
+#include "fwd.h"
+
+#include <yt/cpp/mapreduce/interface/common.h>
+#include <yt/cpp/mapreduce/interface/config.h>
+#include <yt/cpp/mapreduce/interface/public.h>
+
+
+namespace NYT {
+
+///////////////////////////////////////////////////////////////////////////////
+
+struct TClientContext
+{
+ TString ServerName;
+ TString Token;
+ TMaybe<TString> ImpersonationUser;
+ NAuth::IServiceTicketAuthPtrWrapperPtr ServiceTicketAuth;
+ NHttpClient::IHttpClientPtr HttpClient;
+ bool TvmOnly = false;
+ bool UseTLS = false;
+ TConfigPtr Config = TConfig::Get();
+};
+
+bool operator==(const TClientContext& lhs, const TClientContext& rhs);
+bool operator!=(const TClientContext& lhs, const TClientContext& rhs);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/http/core.h b/yt/cpp/mapreduce/http/core.h
new file mode 100644
index 00000000000..37c74d75515
--- /dev/null
+++ b/yt/cpp/mapreduce/http/core.h
@@ -0,0 +1,27 @@
+#pragma once
+
+#include <yt/yt/core/http/public.h>
+
+#include <memory>
+
+namespace NYT::NHttp {
+
+////////////////////////////////////////////////////////////////////////////////
+
+/// @brief Wrapper for THeaderPtr which allows to hide NYT::IntrusivePtr from interfaces.
+struct THeadersPtrWrapper
+{
+ THeadersPtrWrapper(THeadersPtr ptr)
+ : Ptr(std::make_shared<THeadersPtr>(std::move(ptr)))
+ { }
+
+ THeadersPtr Get() {
+ return *Ptr;
+ }
+
+ std::shared_ptr<THeadersPtr> Ptr;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NHttp
diff --git a/yt/cpp/mapreduce/http/fwd.h b/yt/cpp/mapreduce/http/fwd.h
new file mode 100644
index 00000000000..62891731f6c
--- /dev/null
+++ b/yt/cpp/mapreduce/http/fwd.h
@@ -0,0 +1,26 @@
+#pragma once
+
+#include <memory>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TClientContext;
+class THttpHeader;
+
+namespace NHttpClient {
+
+class IHttpClient;
+class IHttpRequest;
+class IHttpResponse;
+
+using IHttpClientPtr = std::shared_ptr<IHttpClient>;
+using IHttpResponsePtr = std::unique_ptr<IHttpResponse>;
+using IHttpRequestPtr = std::unique_ptr<IHttpRequest>;
+
+} // namespace NHttpClient
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/http/helpers.cpp b/yt/cpp/mapreduce/http/helpers.cpp
new file mode 100644
index 00000000000..233a565f205
--- /dev/null
+++ b/yt/cpp/mapreduce/http/helpers.cpp
@@ -0,0 +1,88 @@
+#include "helpers.h"
+
+#include "context.h"
+#include "requests.h"
+
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <library/cpp/yson/node/node_io.h>
+
+namespace NYT {
+
+///////////////////////////////////////////////////////////////////////////////
+
+TString CreateHostNameWithPort(const TString& hostName, const TClientContext& context)
+{
+ static constexpr int HttpProxyPort = 80;
+ static constexpr int HttpsProxyPort = 443;
+
+ static constexpr int TvmOnlyHttpProxyPort = 9026;
+ static constexpr int TvmOnlyHttpsProxyPort = 9443;
+
+ if (hostName.find(':') == TString::npos) {
+ int port;
+ if (context.TvmOnly) {
+ port = context.UseTLS
+ ? TvmOnlyHttpsProxyPort
+ : TvmOnlyHttpProxyPort;
+ } else {
+ port = context.UseTLS
+ ? HttpsProxyPort
+ : HttpProxyPort;
+ }
+ return Format("%v:%v", hostName, port);
+ }
+ return hostName;
+}
+
+TString GetFullUrl(const TString& hostName, const TClientContext& context, THttpHeader& header)
+{
+ Y_UNUSED(context);
+ return Format("http://%v%v", hostName, header.GetUrl());
+}
+
+static TString GetParametersDebugString(const THttpHeader& header)
+{
+ const auto& parameters = header.GetParameters();
+ if (parameters.Empty()) {
+ return "<empty>";
+ } else {
+ return NodeToYsonString(parameters);
+ }
+}
+
+TString TruncateForLogs(const TString& text, size_t maxSize)
+{
+ Y_VERIFY(maxSize > 10);
+ if (text.empty()) {
+ static TString empty = "empty";
+ return empty;
+ } else if (text.size() > maxSize) {
+ TStringStream out;
+ out << text.substr(0, maxSize) + "... (" << text.size() << " bytes total)";
+ return out.Str();
+ } else {
+ return text;
+ }
+}
+
+TString GetLoggedAttributes(const THttpHeader& header, const TString& url, bool includeParameters, size_t sizeLimit)
+{
+ const auto parametersDebugString = GetParametersDebugString(header);
+ TStringStream out;
+ out << "Method: " << url << "; "
+ << "X-YT-Parameters (sent in " << (includeParameters ? "header" : "body") << "): " << TruncateForLogs(parametersDebugString, sizeLimit);
+ return out.Str();
+}
+
+void LogRequest(const THttpHeader& header, const TString& url, bool includeParameters, const TString& requestId, const TString& hostName)
+{
+ YT_LOG_DEBUG("REQ %v - sending request (HostName: %v; %v)",
+ requestId,
+ hostName,
+ GetLoggedAttributes(header, url, includeParameters, Max<size_t>()));
+}
+
+///////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/http/helpers.h b/yt/cpp/mapreduce/http/helpers.h
new file mode 100644
index 00000000000..0c510fa2e86
--- /dev/null
+++ b/yt/cpp/mapreduce/http/helpers.h
@@ -0,0 +1,25 @@
+#pragma once
+
+#include "fwd.h"
+
+#include "http.h"
+
+#include <util/generic/fwd.h>
+
+namespace NYT {
+
+///////////////////////////////////////////////////////////////////////////////
+
+TString CreateHostNameWithPort(const TString& name, const TClientContext& context);
+
+TString GetFullUrl(const TString& hostName, const TClientContext& context, THttpHeader& header);
+
+TString TruncateForLogs(const TString& text, size_t maxSize);
+
+TString GetLoggedAttributes(const THttpHeader& header, const TString& url, bool includeParameters, size_t sizeLimit);
+
+void LogRequest(const THttpHeader& header, const TString& url, bool includeParameters, const TString& requestId, const TString& hostName);
+
+///////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/http/host_manager.cpp b/yt/cpp/mapreduce/http/host_manager.cpp
new file mode 100644
index 00000000000..a239dde769a
--- /dev/null
+++ b/yt/cpp/mapreduce/http/host_manager.cpp
@@ -0,0 +1,140 @@
+#include "host_manager.h"
+
+#include "context.h"
+#include "helpers.h"
+#include "http.h"
+#include "http_client.h"
+#include "requests.h"
+
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <yt/cpp/mapreduce/interface/config.h>
+
+#include <library/cpp/json/json_reader.h>
+
+#include <util/generic/guid.h>
+#include <util/generic/vector.h>
+#include <util/generic/singleton.h>
+#include <util/generic/ymath.h>
+
+#include <util/random/random.h>
+
+#include <util/string/vector.h>
+
+namespace NYT::NPrivate {
+
+////////////////////////////////////////////////////////////////////////////////
+
+static TVector<TString> ParseJsonStringArray(const TString& response)
+{
+ NJson::TJsonValue value;
+ TStringInput input(response);
+ NJson::ReadJsonTree(&input, &value);
+
+ const NJson::TJsonValue::TArray& array = value.GetArray();
+ TVector<TString> result;
+ result.reserve(array.size());
+ for (size_t i = 0; i < array.size(); ++i) {
+ result.push_back(array[i].GetString());
+ }
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+class THostManager::TClusterHostList
+{
+public:
+ explicit TClusterHostList(TVector<TString> hosts)
+ : Hosts_(std::move(hosts))
+ , Timestamp_(TInstant::Now())
+ { }
+
+ explicit TClusterHostList(std::exception_ptr error)
+ : Error_(std::move(error))
+ , Timestamp_(TInstant::Now())
+ { }
+
+ TString ChooseHostOrThrow() const
+ {
+ if (Error_) {
+ std::rethrow_exception(Error_);
+ }
+
+ if (Hosts_.empty()) {
+ ythrow yexception() << "fetched list of proxies is empty";
+ }
+
+ return Hosts_[RandomNumber<size_t>(Hosts_.size())];
+ }
+
+ TDuration GetAge() const
+ {
+ return TInstant::Now() - Timestamp_;
+ }
+
+private:
+ TVector<TString> Hosts_;
+ std::exception_ptr Error_;
+ TInstant Timestamp_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+THostManager& THostManager::Get()
+{
+ return *Singleton<THostManager>();
+}
+
+void THostManager::Reset()
+{
+ auto guard = Guard(Lock_);
+ ClusterHosts_.clear();
+}
+
+TString THostManager::GetProxyForHeavyRequest(const TClientContext& context)
+{
+ auto cluster = context.ServerName;
+ {
+ auto guard = Guard(Lock_);
+ auto it = ClusterHosts_.find(cluster);
+ if (it != ClusterHosts_.end() && it->second.GetAge() < context.Config->HostListUpdateInterval) {
+ return it->second.ChooseHostOrThrow();
+ }
+ }
+
+ auto hostList = GetHosts(context);
+ auto result = hostList.ChooseHostOrThrow();
+ {
+ auto guard = Guard(Lock_);
+ ClusterHosts_.emplace(cluster, std::move(hostList));
+ }
+ return result;
+}
+
+THostManager::TClusterHostList THostManager::GetHosts(const TClientContext& context)
+{
+ TString hostsEndpoint = context.Config->Hosts;
+ while (hostsEndpoint.StartsWith("/")) {
+ hostsEndpoint = hostsEndpoint.substr(1);
+ }
+ THttpHeader header("GET", hostsEndpoint, false);
+
+ try {
+ auto hostName = context.ServerName;
+ auto requestId = CreateGuidAsString();
+ // TODO: we need to set socket timeout here
+ auto response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
+ auto hosts = ParseJsonStringArray(response->GetResponse());
+ for (auto& host : hosts) {
+ host = CreateHostNameWithPort(host, context);
+ }
+ return TClusterHostList(std::move(hosts));
+ } catch (const std::exception& e) {
+ return TClusterHostList(std::current_exception());
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NPrivate
diff --git a/yt/cpp/mapreduce/http/host_manager.h b/yt/cpp/mapreduce/http/host_manager.h
new file mode 100644
index 00000000000..fdbb740566a
--- /dev/null
+++ b/yt/cpp/mapreduce/http/host_manager.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include "fwd.h"
+
+#include <util/generic/string.h>
+#include <util/generic/hash.h>
+#include <util/system/spinlock.h>
+
+
+namespace NYT::NPrivate {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class THostManager
+{
+public:
+ static THostManager& Get();
+
+ TString GetProxyForHeavyRequest(const TClientContext& context);
+
+ // For testing purposes only.
+ void Reset();
+
+private:
+ class TClusterHostList;
+
+private:
+ TAdaptiveLock Lock_;
+ THashMap<TString, TClusterHostList> ClusterHosts_;
+
+private:
+ static TClusterHostList GetHosts(const TClientContext& context);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NPrivate
diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp
new file mode 100644
index 00000000000..d44b2638a00
--- /dev/null
+++ b/yt/cpp/mapreduce/http/http.cpp
@@ -0,0 +1,1014 @@
+#include "http.h"
+
+#include "abortable_http_response.h"
+#include "core.h"
+#include "helpers.h"
+
+#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+#include <yt/cpp/mapreduce/common/wait_proxy.h>
+
+#include <yt/cpp/mapreduce/interface/config.h>
+#include <yt/cpp/mapreduce/interface/errors.h>
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <yt/yt/core/http/http.h>
+
+#include <library/cpp/json/json_writer.h>
+
+#include <library/cpp/string_utils/base64/base64.h>
+#include <library/cpp/string_utils/quote/quote.h>
+
+#include <util/generic/singleton.h>
+#include <util/generic/algorithm.h>
+
+#include <util/stream/mem.h>
+
+#include <util/string/builder.h>
+#include <util/string/cast.h>
+#include <util/string/escape.h>
+#include <util/string/printf.h>
+
+#include <util/system/byteorder.h>
+#include <util/system/getpid.h>
+
+#include <exception>
+
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class THttpRequest::TRequestStream
+ : public IOutputStream
+{
+public:
+ TRequestStream(THttpRequest* httpRequest, const TSocket& s)
+ : HttpRequest_(httpRequest)
+ , SocketOutput_(s)
+ , HttpOutput_(static_cast<IOutputStream*>(&SocketOutput_))
+ {
+ HttpOutput_.EnableKeepAlive(true);
+ }
+
+private:
+ void DoWrite(const void* buf, size_t len) override
+ {
+ WrapWriteFunc([&] {
+ HttpOutput_.Write(buf, len);
+ });
+ }
+
+ void DoWriteV(const TPart* parts, size_t count) override
+ {
+ WrapWriteFunc([&] {
+ HttpOutput_.Write(parts, count);
+ });
+ }
+
+ void DoWriteC(char ch) override
+ {
+ WrapWriteFunc([&] {
+ HttpOutput_.Write(ch);
+ });
+ }
+
+ void DoFlush() override
+ {
+ WrapWriteFunc([&] {
+ HttpOutput_.Flush();
+ });
+ }
+
+ void DoFinish() override
+ {
+ WrapWriteFunc([&] {
+ HttpOutput_.Finish();
+ });
+ }
+
+ void WrapWriteFunc(std::function<void()> func)
+ {
+ CheckErrorState();
+ try {
+ func();
+ } catch (const std::exception&) {
+ HandleWriteException();
+ }
+ }
+
+ // In many cases http proxy stops reading request and resets connection
+ // if error has happend. This function tries to read error response
+ // in such cases.
+ void HandleWriteException() {
+ Y_VERIFY(WriteError_ == nullptr);
+ WriteError_ = std::current_exception();
+ Y_VERIFY(WriteError_ != nullptr);
+ try {
+ HttpRequest_->GetResponseStream();
+ } catch (const TErrorResponse &) {
+ throw;
+ } catch (...) {
+ }
+ std::rethrow_exception(WriteError_);
+ }
+
+ void CheckErrorState()
+ {
+ if (WriteError_) {
+ std::rethrow_exception(WriteError_);
+ }
+ }
+
+private:
+ THttpRequest* const HttpRequest_;
+ TSocketOutput SocketOutput_;
+ THttpOutput HttpOutput_;
+ std::exception_ptr WriteError_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+THttpHeader::THttpHeader(const TString& method, const TString& command, bool isApi)
+ : Method(method)
+ , Command(command)
+ , IsApi(isApi)
+{ }
+
+void THttpHeader::AddParameter(const TString& key, TNode value, bool overwrite)
+{
+ auto it = Parameters.find(key);
+ if (it == Parameters.end()) {
+ Parameters.emplace(key, std::move(value));
+ } else {
+ if (overwrite) {
+ it->second = std::move(value);
+ } else {
+ ythrow yexception() << "Duplicate key: " << key;
+ }
+ }
+}
+
+void THttpHeader::MergeParameters(const TNode& newParameters, bool overwrite)
+{
+ for (const auto& p : newParameters.AsMap()) {
+ AddParameter(p.first, p.second, overwrite);
+ }
+}
+
+void THttpHeader::RemoveParameter(const TString& key)
+{
+ Parameters.erase(key);
+}
+
+TNode THttpHeader::GetParameters() const
+{
+ return Parameters;
+}
+
+void THttpHeader::AddTransactionId(const TTransactionId& transactionId, bool overwrite)
+{
+ if (transactionId) {
+ AddParameter("transaction_id", GetGuidAsString(transactionId), overwrite);
+ } else {
+ RemoveParameter("transaction_id");
+ }
+}
+
+void THttpHeader::AddPath(const TString& path, bool overwrite)
+{
+ AddParameter("path", path, overwrite);
+}
+
+void THttpHeader::AddOperationId(const TOperationId& operationId, bool overwrite)
+{
+ AddParameter("operation_id", GetGuidAsString(operationId), overwrite);
+}
+
+void THttpHeader::AddMutationId()
+{
+ TGUID guid;
+
+ // Some users use `fork()' with yt wrapper
+ // (actually they use python + multiprocessing)
+ // and CreateGuid is not resistant to `fork()', so spice it a little bit.
+ //
+ // Check IGNIETFERRO-610
+ CreateGuid(&guid);
+ guid.dw[2] = GetPID() ^ MicroSeconds();
+
+ AddParameter("mutation_id", GetGuidAsString(guid), true);
+}
+
+bool THttpHeader::HasMutationId() const
+{
+ return Parameters.contains("mutation_id");
+}
+
+void THttpHeader::SetToken(const TString& token)
+{
+ Token = token;
+}
+
+void THttpHeader::SetImpersonationUser(const TString& impersonationUser)
+{
+ ImpersonationUser = impersonationUser;
+}
+
+void THttpHeader::SetServiceTicket(const TString& ticket)
+{
+ ServiceTicket = ticket;
+}
+
+void THttpHeader::SetInputFormat(const TMaybe<TFormat>& format)
+{
+ InputFormat = format;
+}
+
+void THttpHeader::SetOutputFormat(const TMaybe<TFormat>& format)
+{
+ OutputFormat = format;
+}
+
+TMaybe<TFormat> THttpHeader::GetOutputFormat() const
+{
+ return OutputFormat;
+}
+
+void THttpHeader::SetRequestCompression(const TString& compression)
+{
+ RequestCompression = compression;
+}
+
+void THttpHeader::SetResponseCompression(const TString& compression)
+{
+ ResponseCompression = compression;
+}
+
+TString THttpHeader::GetCommand() const
+{
+ return Command;
+}
+
+TString THttpHeader::GetUrl() const
+{
+ TStringStream url;
+
+ if (IsApi) {
+ url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command;
+ } else {
+ url << "/" << Command;
+ }
+
+ return url.Str();
+}
+
+bool THttpHeader::ShouldAcceptFraming() const
+{
+ return TConfig::Get()->CommandsWithFraming.contains(Command);
+}
+
+TString THttpHeader::GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters) const
+{
+ TStringStream result;
+
+ result << Method << " " << GetUrl() << " HTTP/1.1\r\n";
+
+ GetHeader(hostName, requestId, includeParameters).Get()->WriteTo(&result);
+
+ if (ShouldAcceptFraming()) {
+ result << "X-YT-Accept-Framing: 1\r\n";
+ }
+
+ result << "\r\n";
+
+ return result.Str();
+}
+
+NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const TString& requestId, bool includeParameters) const
+{
+ auto headers = New<NHttp::THeaders>();
+
+ headers->Add("Host", hostName);
+ headers->Add("User-Agent", TProcessState::Get()->ClientVersion);
+
+ if (!Token.empty()) {
+ headers->Add("Authorization", "OAuth " + Token);
+ }
+ if (!ServiceTicket.empty()) {
+ headers->Add("X-Ya-Service-Ticket", ServiceTicket);
+ }
+ if (!ImpersonationUser.empty()) {
+ headers->Add("X-Yt-User-Name", ImpersonationUser);
+ }
+
+ if (Method == "PUT" || Method == "POST") {
+ headers->Add("Transfer-Encoding", "chunked");
+ }
+
+ headers->Add("X-YT-Correlation-Id", requestId);
+ headers->Add("X-YT-Header-Format", "<format=text>yson");
+
+ headers->Add("Content-Encoding", RequestCompression);
+ headers->Add("Accept-Encoding", ResponseCompression);
+
+ auto printYTHeader = [&headers] (const char* headerName, const TString& value) {
+ static const size_t maxHttpHeaderSize = 64 << 10;
+ if (!value) {
+ return;
+ }
+ if (value.size() <= maxHttpHeaderSize) {
+ headers->Add(headerName, value);
+ return;
+ }
+
+ TString encoded;
+ Base64Encode(value, encoded);
+ auto ptr = encoded.data();
+ auto finish = encoded.data() + encoded.size();
+ size_t index = 0;
+ do {
+ auto end = Min(ptr + maxHttpHeaderSize, finish);
+ headers->Add(Format("%v%v", headerName, index++), TString(ptr, end));
+ ptr = end;
+ } while (ptr != finish);
+ };
+
+ if (InputFormat) {
+ printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat->Config));
+ }
+ if (OutputFormat) {
+ printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat->Config));
+ }
+ if (includeParameters) {
+ printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters));
+ }
+
+ return NHttp::THeadersPtrWrapper(std::move(headers));
+}
+
+const TString& THttpHeader::GetMethod() const
+{
+ return Method;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TAddressCache* TAddressCache::Get()
+{
+ return Singleton<TAddressCache>();
+}
+
+bool ContainsAddressOfRequiredVersion(const TAddressCache::TAddressPtr& address)
+{
+ if (!TConfig::Get()->ForceIpV4 && !TConfig::Get()->ForceIpV6) {
+ return true;
+ }
+
+ for (auto i = address->Begin(); i != address->End(); ++i) {
+ const auto& addressInfo = *i;
+ if (TConfig::Get()->ForceIpV4 && addressInfo.ai_family == AF_INET) {
+ return true;
+ }
+ if (TConfig::Get()->ForceIpV6 && addressInfo.ai_family == AF_INET6) {
+ return true;
+ }
+ }
+ return false;
+}
+
+TAddressCache::TAddressPtr TAddressCache::Resolve(const TString& hostName)
+{
+ auto address = FindAddress(hostName);
+ if (address) {
+ return address;
+ }
+
+ TString host(hostName);
+ ui16 port = 80;
+
+ auto colon = hostName.find(':');
+ if (colon != TString::npos) {
+ port = FromString<ui16>(hostName.substr(colon + 1));
+ host = hostName.substr(0, colon);
+ }
+
+ auto retryPolicy = CreateDefaultRequestRetryPolicy(TConfig::Get());
+ auto error = yexception() << "can not resolve address of required version for host " << hostName;
+ while (true) {
+ address = new TNetworkAddress(host, port);
+ if (ContainsAddressOfRequiredVersion(address)) {
+ break;
+ }
+ retryPolicy->NotifyNewAttempt();
+ YT_LOG_DEBUG("Failed to resolve address of required version for host %v, retrying: %v",
+ hostName,
+ retryPolicy->GetAttemptDescription());
+ if (auto backoffDuration = retryPolicy->OnGenericError(error)) {
+ NDetail::TWaitProxy::Get()->Sleep(*backoffDuration);
+ } else {
+ ythrow error;
+ }
+ }
+
+ AddAddress(hostName, address);
+ return address;
+}
+
+TAddressCache::TAddressPtr TAddressCache::FindAddress(const TString& hostName) const
+{
+ TCacheEntry entry;
+ {
+ TReadGuard guard(Lock_);
+ auto it = Cache_.find(hostName);
+ if (it == Cache_.end()) {
+ return nullptr;
+ }
+ entry = it->second;
+ }
+
+ if (TInstant::Now() > entry.ExpirationTime) {
+ YT_LOG_DEBUG("Address resolution cache entry for host %v is expired, will retry resolution",
+ hostName);
+ return nullptr;
+ }
+
+ if (!ContainsAddressOfRequiredVersion(entry.Address)) {
+ YT_LOG_DEBUG("Address of required version not found for host %v, will retry resolution",
+ hostName);
+ return nullptr;
+ }
+
+ return entry.Address;
+}
+
+void TAddressCache::AddAddress(TString hostName, TAddressPtr address)
+{
+ auto entry = TCacheEntry{
+ .Address = std::move(address),
+ .ExpirationTime = TInstant::Now() + TConfig::Get()->AddressCacheExpirationTimeout,
+ };
+
+ {
+ TWriteGuard guard(Lock_);
+ Cache_.emplace(std::move(hostName), std::move(entry));
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TConnectionPool* TConnectionPool::Get()
+{
+ return Singleton<TConnectionPool>();
+}
+
+TConnectionPtr TConnectionPool::Connect(
+ const TString& hostName,
+ TDuration socketTimeout)
+{
+ Refresh();
+
+ if (socketTimeout == TDuration::Zero()) {
+ socketTimeout = TConfig::Get()->SocketTimeout;
+ }
+
+ {
+ auto guard = Guard(Lock_);
+ auto now = TInstant::Now();
+ auto range = Connections_.equal_range(hostName);
+ for (auto it = range.first; it != range.second; ++it) {
+ auto& connection = it->second;
+ if (connection->DeadLine < now) {
+ continue;
+ }
+ if (!AtomicCas(&connection->Busy, 1, 0)) {
+ continue;
+ }
+
+ connection->DeadLine = now + socketTimeout;
+ connection->Socket->SetSocketTimeout(socketTimeout.Seconds());
+ return connection;
+ }
+ }
+
+ TConnectionPtr connection(new TConnection);
+
+ auto networkAddress = TAddressCache::Get()->Resolve(hostName);
+ TSocketHolder socket(DoConnect(networkAddress));
+ SetNonBlock(socket, false);
+
+ connection->Socket.Reset(new TSocket(socket.Release()));
+
+ connection->DeadLine = TInstant::Now() + socketTimeout;
+ connection->Socket->SetSocketTimeout(socketTimeout.Seconds());
+
+ {
+ auto guard = Guard(Lock_);
+ static ui32 connectionId = 0;
+ connection->Id = ++connectionId;
+ Connections_.insert({hostName, connection});
+ }
+
+ YT_LOG_DEBUG("New connection to %v #%v opened",
+ hostName,
+ connection->Id);
+
+ return connection;
+}
+
+void TConnectionPool::Release(TConnectionPtr connection)
+{
+ auto socketTimeout = TConfig::Get()->SocketTimeout;
+ auto newDeadline = TInstant::Now() + socketTimeout;
+
+ {
+ auto guard = Guard(Lock_);
+ connection->DeadLine = newDeadline;
+ }
+
+ connection->Socket->SetSocketTimeout(socketTimeout.Seconds());
+ AtomicSet(connection->Busy, 0);
+
+ Refresh();
+}
+
+void TConnectionPool::Invalidate(
+ const TString& hostName,
+ TConnectionPtr connection)
+{
+ auto guard = Guard(Lock_);
+ auto range = Connections_.equal_range(hostName);
+ for (auto it = range.first; it != range.second; ++it) {
+ if (it->second == connection) {
+ YT_LOG_DEBUG("Closing connection #%v",
+ connection->Id);
+ Connections_.erase(it);
+ return;
+ }
+ }
+}
+
+void TConnectionPool::Refresh()
+{
+ auto guard = Guard(Lock_);
+
+ // simple, since we don't expect too many connections
+ using TItem = std::pair<TInstant, TConnectionMap::iterator>;
+ std::vector<TItem> sortedConnections;
+ for (auto it = Connections_.begin(); it != Connections_.end(); ++it) {
+ sortedConnections.emplace_back(it->second->DeadLine, it);
+ }
+
+ std::sort(
+ sortedConnections.begin(),
+ sortedConnections.end(),
+ [] (const TItem& a, const TItem& b) -> bool {
+ return a.first < b.first;
+ });
+
+ auto removeCount = static_cast<int>(Connections_.size()) - TConfig::Get()->ConnectionPoolSize;
+
+ const auto now = TInstant::Now();
+ for (const auto& item : sortedConnections) {
+ const auto& mapIterator = item.second;
+ auto connection = mapIterator->second;
+ if (AtomicGet(connection->Busy)) {
+ continue;
+ }
+
+ if (removeCount > 0) {
+ Connections_.erase(mapIterator);
+ YT_LOG_DEBUG("Closing connection #%v (too many opened connections)",
+ connection->Id);
+ --removeCount;
+ continue;
+ }
+
+ if (connection->DeadLine < now) {
+ Connections_.erase(mapIterator);
+ YT_LOG_DEBUG("Closing connection #%v (timeout)",
+ connection->Id);
+ }
+ }
+}
+
+SOCKET TConnectionPool::DoConnect(TAddressCache::TAddressPtr address)
+{
+ int lastError = 0;
+
+ for (auto i = address->Begin(); i != address->End(); ++i) {
+ struct addrinfo* info = &*i;
+
+ if (TConfig::Get()->ForceIpV4 && info->ai_family != AF_INET) {
+ continue;
+ }
+
+ if (TConfig::Get()->ForceIpV6 && info->ai_family != AF_INET6) {
+ continue;
+ }
+
+ TSocketHolder socket(
+ ::socket(info->ai_family, info->ai_socktype, info->ai_protocol));
+
+ if (socket.Closed()) {
+ lastError = LastSystemError();
+ continue;
+ }
+
+ SetNonBlock(socket, true);
+ if (TConfig::Get()->SocketPriority) {
+ SetSocketPriority(socket, *TConfig::Get()->SocketPriority);
+ }
+
+ if (connect(socket, info->ai_addr, info->ai_addrlen) == 0)
+ return socket.Release();
+
+ int err = LastSystemError();
+ if (err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK) {
+ struct pollfd p = {
+ socket,
+ POLLOUT,
+ 0
+ };
+ const ssize_t n = PollD(&p, 1, TInstant::Now() + TConfig::Get()->ConnectTimeout);
+ if (n < 0) {
+ ythrow TSystemError(-(int)n) << "can not connect to " << info;
+ }
+ CheckedGetSockOpt(socket, SOL_SOCKET, SO_ERROR, err, "socket error");
+ if (!err)
+ return socket.Release();
+ }
+
+ lastError = err;
+ continue;
+ }
+
+ ythrow TSystemError(lastError) << "can not connect to " << *address;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+static TMaybe<TString> GetProxyName(const THttpInput& input)
+{
+ if (auto proxyHeader = input.Headers().FindHeader("X-YT-Proxy")) {
+ return proxyHeader->Value();
+ }
+ return Nothing();
+}
+
+THttpResponse::THttpResponse(
+ IInputStream* socketStream,
+ const TString& requestId,
+ const TString& hostName)
+ : HttpInput_(socketStream)
+ , RequestId_(requestId)
+ , HostName_(GetProxyName(HttpInput_).GetOrElse(hostName))
+ , Unframe_(HttpInput_.Headers().HasHeader("X-YT-Framing"))
+{
+ HttpCode_ = ParseHttpRetCode(HttpInput_.FirstLine());
+ if (HttpCode_ == 200 || HttpCode_ == 202) {
+ return;
+ }
+
+ ErrorResponse_ = TErrorResponse(HttpCode_, RequestId_);
+
+ auto logAndSetError = [&] (const TString& rawError) {
+ YT_LOG_ERROR("RSP %v - HTTP %v - %v",
+ RequestId_,
+ HttpCode_,
+ rawError.data());
+ ErrorResponse_->SetRawError(rawError);
+ };
+
+ switch (HttpCode_) {
+ case 429:
+ logAndSetError("request rate limit exceeded");
+ break;
+
+ case 500:
+ logAndSetError(::TStringBuilder() << "internal error in proxy " << HostName_);
+ break;
+
+ default: {
+ TStringStream httpHeaders;
+ httpHeaders << "HTTP headers (";
+ for (const auto& header : HttpInput_.Headers()) {
+ httpHeaders << header.Name() << ": " << header.Value() << "; ";
+ }
+ httpHeaders << ")";
+
+ auto errorString = Sprintf("RSP %s - HTTP %d - %s",
+ RequestId_.data(),
+ HttpCode_,
+ httpHeaders.Str().data());
+
+ YT_LOG_ERROR("%v",
+ errorString.data());
+
+ if (auto parsedResponse = ParseError(HttpInput_.Headers())) {
+ ErrorResponse_ = parsedResponse.GetRef();
+ } else {
+ ErrorResponse_->SetRawError(
+ errorString + " - X-YT-Error is missing in headers");
+ }
+ break;
+ }
+ }
+}
+
+const THttpHeaders& THttpResponse::Headers() const
+{
+ return HttpInput_.Headers();
+}
+
+void THttpResponse::CheckErrorResponse() const
+{
+ if (ErrorResponse_) {
+ throw *ErrorResponse_;
+ }
+}
+
+bool THttpResponse::IsExhausted() const
+{
+ return IsExhausted_;
+}
+
+int THttpResponse::GetHttpCode() const
+{
+ return HttpCode_;
+}
+
+const TString& THttpResponse::GetHostName() const
+{
+ return HostName_;
+}
+
+bool THttpResponse::IsKeepAlive() const
+{
+ return HttpInput_.IsKeepAlive();
+}
+
+TMaybe<TErrorResponse> THttpResponse::ParseError(const THttpHeaders& headers)
+{
+ for (const auto& header : headers) {
+ if (header.Name() == "X-YT-Error") {
+ TErrorResponse errorResponse(HttpCode_, RequestId_);
+ errorResponse.ParseFromJsonError(header.Value());
+ if (errorResponse.IsOk()) {
+ return Nothing();
+ }
+ return errorResponse;
+ }
+ }
+ return Nothing();
+}
+
+size_t THttpResponse::DoRead(void* buf, size_t len)
+{
+ size_t read;
+ if (Unframe_) {
+ read = UnframeRead(buf, len);
+ } else {
+ read = HttpInput_.Read(buf, len);
+ }
+ if (read == 0 && len != 0) {
+ // THttpInput MUST return defined (but may be empty)
+ // trailers when it is exhausted.
+ Y_VERIFY(HttpInput_.Trailers().Defined(),
+ "trailers MUST be defined for exhausted stream");
+ CheckTrailers(HttpInput_.Trailers().GetRef());
+ IsExhausted_ = true;
+ }
+ return read;
+}
+
+size_t THttpResponse::DoSkip(size_t len)
+{
+ size_t skipped;
+ if (Unframe_) {
+ skipped = UnframeSkip(len);
+ } else {
+ skipped = HttpInput_.Skip(len);
+ }
+ if (skipped == 0 && len != 0) {
+ // THttpInput MUST return defined (but may be empty)
+ // trailers when it is exhausted.
+ Y_VERIFY(HttpInput_.Trailers().Defined(),
+ "trailers MUST be defined for exhausted stream");
+ CheckTrailers(HttpInput_.Trailers().GetRef());
+ IsExhausted_ = true;
+ }
+ return skipped;
+}
+
+void THttpResponse::CheckTrailers(const THttpHeaders& trailers)
+{
+ if (auto errorResponse = ParseError(trailers)) {
+ errorResponse->SetIsFromTrailers(true);
+ YT_LOG_ERROR("RSP %v - %v",
+ RequestId_,
+ errorResponse.GetRef().what());
+ ythrow errorResponse.GetRef();
+ }
+}
+
+static ui32 ReadDataFrameSize(THttpInput* stream)
+{
+ ui32 littleEndianSize;
+ auto read = stream->Load(&littleEndianSize, sizeof(littleEndianSize));
+ if (read < sizeof(littleEndianSize)) {
+ ythrow yexception() << "Bad data frame header: " <<
+ "expected " << sizeof(littleEndianSize) << " bytes, got " << read;
+ }
+ return LittleToHost(littleEndianSize);
+}
+
+bool THttpResponse::RefreshFrameIfNecessary()
+{
+ while (RemainingFrameSize_ == 0) {
+ ui8 frameTypeByte;
+ auto read = HttpInput_.Read(&frameTypeByte, sizeof(frameTypeByte));
+ if (read == 0) {
+ return false;
+ }
+ auto frameType = static_cast<EFrameType>(frameTypeByte);
+ switch (frameType) {
+ case EFrameType::KeepAlive:
+ break;
+ case EFrameType::Data:
+ RemainingFrameSize_ = ReadDataFrameSize(&HttpInput_);
+ break;
+ default:
+ ythrow yexception() << "Bad frame type " << static_cast<int>(frameTypeByte);
+ }
+ }
+ return true;
+}
+
+size_t THttpResponse::UnframeRead(void* buf, size_t len)
+{
+ if (!RefreshFrameIfNecessary()) {
+ return 0;
+ }
+ auto read = HttpInput_.Read(buf, Min(len, RemainingFrameSize_));
+ RemainingFrameSize_ -= read;
+ return read;
+}
+
+size_t THttpResponse::UnframeSkip(size_t len)
+{
+ if (!RefreshFrameIfNecessary()) {
+ return 0;
+ }
+ auto skipped = HttpInput_.Skip(Min(len, RemainingFrameSize_));
+ RemainingFrameSize_ -= skipped;
+ return skipped;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+THttpRequest::THttpRequest()
+{
+ RequestId = CreateGuidAsString();
+}
+
+THttpRequest::THttpRequest(const TString& requestId)
+ : RequestId(requestId)
+{ }
+
+THttpRequest::~THttpRequest()
+{
+ if (!Connection) {
+ return;
+ }
+
+ if (Input && Input->IsKeepAlive() && Input->IsExhausted()) {
+ // We should return to the pool only connections where HTTP response was fully read.
+ // Otherwise next reader might read our remaining data and misinterpret them (YT-6510).
+ TConnectionPool::Get()->Release(Connection);
+ } else {
+ TConnectionPool::Get()->Invalidate(HostName, Connection);
+ }
+}
+
+TString THttpRequest::GetRequestId() const
+{
+ return RequestId;
+}
+
+void THttpRequest::Connect(TString hostName, TDuration socketTimeout)
+{
+ HostName = std::move(hostName);
+ YT_LOG_DEBUG("REQ %v - requesting connection to %v from connection pool",
+ RequestId,
+ HostName);
+
+ StartTime_ = TInstant::Now();
+ Connection = TConnectionPool::Get()->Connect(HostName, socketTimeout);
+
+ YT_LOG_DEBUG("REQ %v - connection #%v",
+ RequestId,
+ Connection->Id);
+}
+
+IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool includeParameters)
+{
+ auto strHeader = header.GetHeaderAsString(HostName, RequestId, includeParameters);
+ Url_ = header.GetUrl();
+
+ LogRequest(header, Url_, includeParameters, RequestId, HostName);
+
+ LoggedAttributes_ = GetLoggedAttributes(header, Url_, includeParameters, 128);
+
+ auto outputFormat = header.GetOutputFormat();
+ if (outputFormat && outputFormat->IsTextYson()) {
+ LogResponse = true;
+ }
+
+ RequestStream_ = MakeHolder<TRequestStream>(this, *Connection->Socket.Get());
+
+ RequestStream_->Write(strHeader.data(), strHeader.size());
+ return RequestStream_.Get();
+}
+
+IOutputStream* THttpRequest::StartRequest(const THttpHeader& header)
+{
+ return StartRequestImpl(header, true);
+}
+
+void THttpRequest::FinishRequest()
+{
+ RequestStream_->Flush();
+ RequestStream_->Finish();
+}
+
+void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body)
+{
+ if (!body && (header.GetMethod() == "PUT" || header.GetMethod() == "POST")) {
+ const auto& parameters = header.GetParameters();
+ auto parametersStr = NodeToYsonString(parameters);
+ auto* output = StartRequestImpl(header, false);
+ output->Write(parametersStr);
+ FinishRequest();
+ } else {
+ auto* output = StartRequest(header);
+ if (body) {
+ output->Write(*body);
+ }
+ FinishRequest();
+ }
+}
+
+THttpResponse* THttpRequest::GetResponseStream()
+{
+ if (!Input) {
+ SocketInput.Reset(new TSocketInput(*Connection->Socket.Get()));
+ if (TConfig::Get()->UseAbortableResponse) {
+ Y_VERIFY(!Url_.empty());
+ Input.Reset(new TAbortableHttpResponse(SocketInput.Get(), RequestId, HostName, Url_));
+ } else {
+ Input.Reset(new THttpResponse(SocketInput.Get(), RequestId, HostName));
+ }
+ Input->CheckErrorResponse();
+ }
+ return Input.Get();
+}
+
+TString THttpRequest::GetResponse()
+{
+ TString result = GetResponseStream()->ReadAll();
+
+ TStringStream loggedAttributes;
+ loggedAttributes
+ << "Time: " << TInstant::Now() - StartTime_ << "; "
+ << "HostName: " << GetResponseStream()->GetHostName() << "; "
+ << LoggedAttributes_;
+
+ if (LogResponse) {
+ constexpr auto sizeLimit = 1 << 7;
+ YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)",
+ RequestId,
+ TruncateForLogs(result, sizeLimit),
+ loggedAttributes.Str());
+ } else {
+ YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)",
+ RequestId,
+ result.size(),
+ loggedAttributes.Str());
+ }
+ return result;
+}
+
+int THttpRequest::GetHttpCode() {
+ return GetResponseStream()->GetHttpCode();
+}
+
+void THttpRequest::InvalidateConnection()
+{
+ TConnectionPool::Get()->Invalidate(HostName, Connection);
+ Connection.Reset();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h
new file mode 100644
index 00000000000..ee8783088db
--- /dev/null
+++ b/yt/cpp/mapreduce/http/http.h
@@ -0,0 +1,256 @@
+#pragma once
+
+#include "fwd.h"
+
+#include <yt/cpp/mapreduce/interface/common.h>
+#include <yt/cpp/mapreduce/interface/errors.h>
+#include <yt/cpp/mapreduce/interface/format.h>
+#include <yt/cpp/mapreduce/interface/io.h>
+#include <yt/cpp/mapreduce/interface/node.h>
+
+#include <library/cpp/deprecated/atomic/atomic.h>
+#include <library/cpp/http/io/stream.h>
+
+#include <util/generic/hash.h>
+#include <util/generic/hash_multi_map.h>
+#include <util/generic/strbuf.h>
+#include <util/generic/guid.h>
+#include <util/network/socket.h>
+#include <util/stream/input.h>
+#include <util/system/mutex.h>
+#include <util/system/rwlock.h>
+#include <util/generic/ptr.h>
+
+namespace NYT {
+
+class TNode;
+
+namespace NHttp {
+
+struct THeadersPtrWrapper;
+
+} // NHttp
+
+///////////////////////////////////////////////////////////////////////////////
+
+enum class EFrameType
+{
+ Data = 0x01,
+ KeepAlive = 0x02,
+};
+
+
+class THttpHeader
+{
+public:
+ THttpHeader(const TString& method, const TString& command, bool isApi = true);
+
+ void AddParameter(const TString& key, TNode value, bool overwrite = false);
+ void RemoveParameter(const TString& key);
+ void MergeParameters(const TNode& parameters, bool overwrite = false);
+ TNode GetParameters() const;
+
+ void AddTransactionId(const TTransactionId& transactionId, bool overwrite = false);
+ void AddPath(const TString& path, bool overwrite = false);
+ void AddOperationId(const TOperationId& operationId, bool overwrite = false);
+ void AddMutationId();
+ bool HasMutationId() const;
+
+ void SetToken(const TString& token);
+ void SetImpersonationUser(const TString& impersonationUser);
+
+ void SetServiceTicket(const TString& ticket);
+
+ void SetInputFormat(const TMaybe<TFormat>& format);
+
+ void SetOutputFormat(const TMaybe<TFormat>& format);
+ TMaybe<TFormat> GetOutputFormat() const;
+
+ void SetRequestCompression(const TString& compression);
+ void SetResponseCompression(const TString& compression);
+
+ TString GetCommand() const;
+ TString GetUrl() const;
+ TString GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters = true) const;
+ NHttp::THeadersPtrWrapper GetHeader(const TString& hostName, const TString& requestId, bool includeParameters) const;
+
+ const TString& GetMethod() const;
+
+private:
+ bool ShouldAcceptFraming() const;
+
+private:
+ const TString Method;
+ const TString Command;
+ const bool IsApi;
+
+ TNode::TMapType Parameters;
+ TString ImpersonationUser;
+ TString Token;
+ TString ServiceTicket;
+ TNode Attributes;
+
+private:
+ TMaybe<TFormat> InputFormat = TFormat::YsonText();
+ TMaybe<TFormat> OutputFormat = TFormat::YsonText();
+
+ TString RequestCompression = "identity";
+ TString ResponseCompression = "identity";
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TAddressCache
+{
+public:
+ using TAddressPtr = TAtomicSharedPtr<TNetworkAddress>;
+
+ static TAddressCache* Get();
+
+ TAddressPtr Resolve(const TString& hostName);
+
+private:
+ struct TCacheEntry {
+ TAddressPtr Address;
+ TInstant ExpirationTime;
+ };
+
+private:
+ TAddressPtr FindAddress(const TString& hostName) const;
+ void AddAddress(TString hostName, TAddressPtr address);
+
+private:
+ TRWMutex Lock_;
+ THashMap<TString, TCacheEntry> Cache_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TConnection
+{
+ THolder<TSocket> Socket;
+ TAtomic Busy = 1;
+ TInstant DeadLine;
+ ui32 Id;
+};
+
+using TConnectionPtr = TAtomicSharedPtr<TConnection>;
+
+class TConnectionPool
+{
+public:
+ using TConnectionMap = THashMultiMap<TString, TConnectionPtr>;
+
+ static TConnectionPool* Get();
+
+ TConnectionPtr Connect(const TString& hostName, TDuration socketTimeout);
+ void Release(TConnectionPtr connection);
+ void Invalidate(const TString& hostName, TConnectionPtr connection);
+
+private:
+ void Refresh();
+ static SOCKET DoConnect(TAddressCache::TAddressPtr address);
+
+private:
+ TMutex Lock_;
+ TConnectionMap Connections_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+//
+// Input stream that handles YT-specific header/trailer errors
+// and throws TErrorResponse if it finds any.
+class THttpResponse
+ : public IInputStream
+{
+public:
+ // 'requestId' and 'hostName' are provided for debug reasons
+ // (they will appear in some error messages).
+ THttpResponse(
+ IInputStream* socketStream,
+ const TString& requestId,
+ const TString& hostName);
+
+ const THttpHeaders& Headers() const;
+
+ void CheckErrorResponse() const;
+ bool IsExhausted() const;
+ int GetHttpCode() const;
+ const TString& GetHostName() const;
+ bool IsKeepAlive() const;
+
+protected:
+ size_t DoRead(void* buf, size_t len) override;
+ size_t DoSkip(size_t len) override;
+
+private:
+ void CheckTrailers(const THttpHeaders& trailers);
+ TMaybe<TErrorResponse> ParseError(const THttpHeaders& headers);
+ size_t UnframeRead(void* buf, size_t len);
+ size_t UnframeSkip(size_t len);
+ bool RefreshFrameIfNecessary();
+
+private:
+ THttpInput HttpInput_;
+ const TString RequestId_;
+ const TString HostName_;
+ int HttpCode_ = 0;
+ TMaybe<TErrorResponse> ErrorResponse_;
+ bool IsExhausted_ = false;
+ const bool Unframe_;
+ size_t RemainingFrameSize_ = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class THttpRequest
+{
+public:
+ THttpRequest();
+ THttpRequest(const TString& requestId);
+ ~THttpRequest();
+
+ TString GetRequestId() const;
+
+ void Connect(TString hostName, TDuration socketTimeout = TDuration::Zero());
+
+ IOutputStream* StartRequest(const THttpHeader& header);
+ void FinishRequest();
+
+ void SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body);
+
+ THttpResponse* GetResponseStream();
+
+ TString GetResponse();
+
+ void InvalidateConnection();
+
+ int GetHttpCode();
+
+private:
+ IOutputStream* StartRequestImpl(const THttpHeader& header, bool includeParameters);
+
+private:
+ class TRequestStream;
+
+private:
+ TString HostName;
+ TString RequestId;
+ TString Url_;
+ TInstant StartTime_;
+ TString LoggedAttributes_;
+
+ TConnectionPtr Connection;
+
+ THolder<TRequestStream> RequestStream_;
+
+ THolder<TSocketInput> SocketInput;
+ THolder<THttpResponse> Input;
+
+ bool LogResponse = false;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/http/http_client.cpp b/yt/cpp/mapreduce/http/http_client.cpp
new file mode 100644
index 00000000000..a2af1182dcf
--- /dev/null
+++ b/yt/cpp/mapreduce/http/http_client.cpp
@@ -0,0 +1,603 @@
+#include "http_client.h"
+
+#include "abortable_http_response.h"
+#include "core.h"
+#include "helpers.h"
+#include "http.h"
+
+#include <yt/cpp/mapreduce/interface/config.h>
+
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <yt/yt/core/concurrency/thread_pool_poller.h>
+
+#include <yt/yt/core/http/client.h>
+#include <yt/yt/core/http/config.h>
+#include <yt/yt/core/http/http.h>
+
+#include <yt/yt/core/https/client.h>
+#include <yt/yt/core/https/config.h>
+
+#include <library/cpp/yson/node/node_io.h>
+
+namespace NYT::NHttpClient {
+
+namespace {
+
+TString CreateHost(TStringBuf host, TStringBuf port)
+{
+ if (!port.empty()) {
+ return Format("%v:%v", host, port);
+ }
+
+ return TString(host);
+}
+
+TMaybe<TErrorResponse> GetErrorResponse(const TString& hostName, const TString& requestId, const NHttp::IResponsePtr& response)
+{
+ auto httpCode = response->GetStatusCode();
+ if (httpCode == NHttp::EStatusCode::OK || httpCode == NHttp::EStatusCode::Accepted) {
+ return {};
+ }
+
+ TErrorResponse errorResponse(static_cast<int>(httpCode), requestId);
+
+ auto logAndSetError = [&] (const TString& rawError) {
+ YT_LOG_ERROR("RSP %v - HTTP %v - %v",
+ requestId,
+ httpCode,
+ rawError.data());
+ errorResponse.SetRawError(rawError);
+ };
+
+ switch (httpCode) {
+ case NHttp::EStatusCode::TooManyRequests:
+ logAndSetError("request rate limit exceeded");
+ break;
+
+ case NHttp::EStatusCode::InternalServerError:
+ logAndSetError("internal error in proxy " + hostName);
+ break;
+
+ default: {
+ TStringStream httpHeaders;
+ httpHeaders << "HTTP headers (";
+ for (const auto& [headerName, headerValue] : response->GetHeaders()->Dump()) {
+ httpHeaders << headerName << ": " << headerValue << "; ";
+ }
+ httpHeaders << ")";
+
+ auto errorString = Sprintf("RSP %s - HTTP %d - %s",
+ requestId.data(),
+ static_cast<int>(httpCode),
+ httpHeaders.Str().data());
+
+ YT_LOG_ERROR("%v",
+ errorString.data());
+
+ if (auto errorHeader = response->GetHeaders()->Find("X-YT-Error")) {
+ errorResponse.ParseFromJsonError(*errorHeader);
+ if (errorResponse.IsOk()) {
+ return Nothing();
+ }
+ return errorResponse;
+ }
+
+ errorResponse.SetRawError(
+ errorString + " - X-YT-Error is missing in headers");
+ break;
+ }
+ }
+
+ return errorResponse;
+}
+
+void CheckErrorResponse(const TString& hostName, const TString& requestId, const NHttp::IResponsePtr& response)
+{
+ auto errorResponse = GetErrorResponse(hostName, requestId, response);
+ if (errorResponse) {
+ throw *errorResponse;
+ }
+}
+
+} // namespace
+
+///////////////////////////////////////////////////////////////////////////////
+
+class TDefaultHttpResponse
+ : public IHttpResponse
+{
+public:
+ TDefaultHttpResponse(std::unique_ptr<THttpRequest> request)
+ : Request_(std::move(request))
+ { }
+
+ int GetStatusCode() override
+ {
+ return Request_->GetHttpCode();
+ }
+
+ IInputStream* GetResponseStream() override
+ {
+ return Request_->GetResponseStream();
+ }
+
+ TString GetResponse() override
+ {
+ return Request_->GetResponse();
+ }
+
+ TString GetRequestId() const override
+ {
+ return Request_->GetRequestId();
+ }
+
+private:
+ std::unique_ptr<THttpRequest> Request_;
+};
+
+class TDefaultHttpRequest
+ : public IHttpRequest
+{
+public:
+ TDefaultHttpRequest(std::unique_ptr<THttpRequest> request, IOutputStream* stream)
+ : Request_(std::move(request))
+ , Stream_(stream)
+ { }
+
+ IOutputStream* GetStream() override
+ {
+ return Stream_;
+ }
+
+ IHttpResponsePtr Finish() override
+ {
+ Request_->FinishRequest();
+ return std::make_unique<TDefaultHttpResponse>(std::move(Request_));
+ }
+
+private:
+ std::unique_ptr<THttpRequest> Request_;
+ IOutputStream* Stream_;
+};
+
+class TDefaultHttpClient
+ : public IHttpClient
+{
+public:
+ IHttpResponsePtr Request(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header, TMaybe<TStringBuf> body) override
+ {
+ auto request = std::make_unique<THttpRequest>(requestId);
+
+ auto urlRef = NHttp::ParseUrl(url);
+
+ request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout);
+ request->SmallRequest(header, body);
+ return std::make_unique<TDefaultHttpResponse>(std::move(request));
+ }
+
+ IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header) override
+ {
+ auto request = std::make_unique<THttpRequest>(requestId);
+
+ auto urlRef = NHttp::ParseUrl(url);
+
+ request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout);
+ auto stream = request->StartRequest(header);
+ return std::make_unique<TDefaultHttpRequest>(std::move(request), stream);
+ }
+};
+
+///////////////////////////////////////////////////////////////////////////////
+
+struct TCoreRequestContext
+{
+ TString HostName;
+ TString Url;
+ TString RequestId;
+ bool LogResponse;
+ TInstant StartTime;
+ TString LoggedAttributes;
+};
+
+class TCoreHttpResponse
+ : public IHttpResponse
+{
+public:
+ TCoreHttpResponse(
+ TCoreRequestContext context,
+ NHttp::IResponsePtr response)
+ : Context_(std::move(context))
+ , Response_(std::move(response))
+ { }
+
+ int GetStatusCode() override
+ {
+ return static_cast<int>(Response_->GetStatusCode());
+ }
+
+ IInputStream* GetResponseStream() override
+ {
+ if (!Stream_) {
+ auto stream = std::make_unique<TWrappedStream>(
+ NConcurrency::CreateSyncAdapter(NConcurrency::CreateCopyingAdapter(Response_), NConcurrency::EWaitForStrategy::WaitFor),
+ Response_,
+ Context_.RequestId);
+ CheckErrorResponse(Context_.HostName, Context_.RequestId, Response_);
+
+ if (TConfig::Get()->UseAbortableResponse) {
+ Y_VERIFY(!Context_.Url.empty());
+ Stream_ = std::make_unique<TAbortableCoreHttpResponse>(std::move(stream), Context_.Url);
+ } else {
+ Stream_ = std::move(stream);
+ }
+ }
+
+ return Stream_.get();
+ }
+
+ TString GetResponse() override
+ {
+ auto result = GetResponseStream()->ReadAll();
+
+ TStringStream loggedAttributes;
+ loggedAttributes
+ << "Time: " << TInstant::Now() - Context_.StartTime << "; "
+ << "HostName: " << Context_.HostName << "; "
+ << Context_.LoggedAttributes;
+
+ if (Context_.LogResponse) {
+ constexpr auto sizeLimit = 1 << 7;
+ YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)",
+ Context_.RequestId,
+ TruncateForLogs(result, sizeLimit),
+ loggedAttributes.Str());
+ } else {
+ YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)",
+ Context_.RequestId,
+ result.size(),
+ loggedAttributes.Str());
+ }
+ return result;
+ }
+
+ TString GetRequestId() const override
+ {
+ return Context_.RequestId;
+ }
+
+private:
+ class TWrappedStream
+ : public IInputStream
+ {
+ public:
+ TWrappedStream(std::unique_ptr<IInputStream> underlying, NHttp::IResponsePtr response, TString requestId)
+ : Underlying_(std::move(underlying))
+ , Response_(std::move(response))
+ , RequestId_(std::move(requestId))
+ { }
+
+ protected:
+ size_t DoRead(void* buf, size_t len) override
+ {
+ size_t read = Underlying_->Read(buf, len);
+
+ if (read == 0 && len != 0) {
+ CheckTrailers(Response_->GetTrailers());
+ }
+ return read;
+ }
+
+ size_t DoSkip(size_t len) override
+ {
+ size_t skipped = Underlying_->Skip(len);
+ if (skipped == 0 && len != 0) {
+ CheckTrailers(Response_->GetTrailers());
+ }
+ return skipped;
+ }
+
+ private:
+ void CheckTrailers(const NHttp::THeadersPtr& trailers)
+ {
+ if (auto errorResponse = ParseError(trailers)) {
+ errorResponse->SetIsFromTrailers(true);
+ YT_LOG_ERROR("RSP %v - %v",
+ RequestId_,
+ errorResponse.GetRef().what());
+ ythrow errorResponse.GetRef();
+ }
+ }
+
+ TMaybe<TErrorResponse> ParseError(const NHttp::THeadersPtr& headers)
+ {
+ if (auto errorHeader = headers->Find("X-YT-Error")) {
+ TErrorResponse errorResponse(static_cast<int>(Response_->GetStatusCode()), RequestId_);
+ errorResponse.ParseFromJsonError(*errorHeader);
+ if (errorResponse.IsOk()) {
+ return Nothing();
+ }
+ return errorResponse;
+ }
+ return Nothing();
+ }
+
+ private:
+ std::unique_ptr<IInputStream> Underlying_;
+ NHttp::IResponsePtr Response_;
+ TString RequestId_;
+ };
+
+private:
+ TCoreRequestContext Context_;
+ NHttp::IResponsePtr Response_;
+ std::unique_ptr<IInputStream> Stream_;
+};
+
+class TCoreHttpRequest
+ : public IHttpRequest
+{
+public:
+ TCoreHttpRequest(TCoreRequestContext context, NHttp::IActiveRequestPtr activeRequest)
+ : Context_(std::move(context))
+ , ActiveRequest_(std::move(activeRequest))
+ , Stream_(NConcurrency::CreateBufferedSyncAdapter(ActiveRequest_->GetRequestStream()))
+ , WrappedStream_(this, Stream_.get())
+ { }
+
+ IOutputStream* GetStream() override
+ {
+ return &WrappedStream_;
+ }
+
+ IHttpResponsePtr Finish() override
+ {
+ WrappedStream_.Flush();
+ auto response = ActiveRequest_->Finish().Get().ValueOrThrow();
+ return std::make_unique<TCoreHttpResponse>(std::move(Context_), std::move(response));
+ }
+
+ IHttpResponsePtr FinishWithError()
+ {
+ auto response = ActiveRequest_->GetResponse();
+ return std::make_unique<TCoreHttpResponse>(std::move(Context_), std::move(response));
+ }
+
+private:
+ class TWrappedStream
+ : public IOutputStream
+ {
+ public:
+ TWrappedStream(TCoreHttpRequest* httpRequest, IOutputStream* underlying)
+ : HttpRequest_(httpRequest)
+ , Underlying_(underlying)
+ { }
+
+ private:
+ void DoWrite(const void* buf, size_t len) override
+ {
+ WrapWriteFunc([&] {
+ Underlying_->Write(buf, len);
+ });
+ }
+
+ void DoWriteV(const TPart* parts, size_t count) override
+ {
+ WrapWriteFunc([&] {
+ Underlying_->Write(parts, count);
+ });
+ }
+
+ void DoWriteC(char ch) override
+ {
+ WrapWriteFunc([&] {
+ Underlying_->Write(ch);
+ });
+ }
+
+ void DoFlush() override
+ {
+ WrapWriteFunc([&] {
+ Underlying_->Flush();
+ });
+ }
+
+ void DoFinish() override
+ {
+ WrapWriteFunc([&] {
+ Underlying_->Finish();
+ });
+ }
+
+ void WrapWriteFunc(std::function<void()> func)
+ {
+ CheckErrorState();
+ try {
+ func();
+ } catch (const std::exception&) {
+ HandleWriteException();
+ }
+ }
+
+ // In many cases http proxy stops reading request and resets connection
+ // if error has happend. This function tries to read error response
+ // in such cases.
+ void HandleWriteException() {
+ Y_VERIFY(WriteError_ == nullptr);
+ WriteError_ = std::current_exception();
+ Y_VERIFY(WriteError_ != nullptr);
+ try {
+ HttpRequest_->FinishWithError()->GetResponseStream();
+ } catch (const TErrorResponse &) {
+ throw;
+ } catch (...) {
+ }
+ std::rethrow_exception(WriteError_);
+ }
+
+ void CheckErrorState()
+ {
+ if (WriteError_) {
+ std::rethrow_exception(WriteError_);
+ }
+ }
+
+ private:
+ TCoreHttpRequest* const HttpRequest_;
+ IOutputStream* Underlying_;
+ std::exception_ptr WriteError_;
+ };
+
+private:
+ TCoreRequestContext Context_;
+ NHttp::IActiveRequestPtr ActiveRequest_;
+ std::unique_ptr<IOutputStream> Stream_;
+ TWrappedStream WrappedStream_;
+};
+
+class TCoreHttpClient
+ : public IHttpClient
+{
+public:
+ TCoreHttpClient(bool useTLS, const TConfigPtr& config)
+ : Poller_(NConcurrency::CreateThreadPoolPoller(1, "http_poller")) // TODO(nadya73): YT-18363: move threads count to config
+ {
+ if (useTLS) {
+ auto httpsConfig = NYT::New<NYT::NHttps::TClientConfig>();
+ httpsConfig->MaxIdleConnections = config->ConnectionPoolSize;
+ Client_ = NHttps::CreateClient(httpsConfig, Poller_);
+ } else {
+ auto httpConfig = NYT::New<NYT::NHttp::TClientConfig>();
+ httpConfig->MaxIdleConnections = config->ConnectionPoolSize;
+ Client_ = NHttp::CreateClient(httpConfig, Poller_);
+ }
+ }
+
+ IHttpResponsePtr Request(const TString& url, const TString& requestId, const THttpConfig& /*config*/, const THttpHeader& header, TMaybe<TStringBuf> body) override
+ {
+ TCoreRequestContext context = CreateContext(url, requestId, header);
+
+ // TODO(nadya73): YT-18363: pass socket timeouts from THttpConfig
+
+ NHttp::IResponsePtr response;
+
+ auto logRequest = [&](bool includeParameters) {
+ LogRequest(header, url, includeParameters, requestId, context.HostName);
+ context.LoggedAttributes = GetLoggedAttributes(header, url, includeParameters, 128);
+ };
+
+ if (!body && (header.GetMethod() == "PUT" || header.GetMethod() == "POST")) {
+ const auto& parameters = header.GetParameters();
+ auto parametersStr = NodeToYsonString(parameters);
+
+ bool includeParameters = false;
+ auto headers = header.GetHeader(context.HostName, requestId, includeParameters).Get();
+
+ logRequest(includeParameters);
+
+ auto activeRequest = StartRequestImpl(header.GetMethod(), url, headers);
+
+ activeRequest->GetRequestStream()->Write(TSharedRef::FromString(parametersStr)).Get().ThrowOnError();
+ response = activeRequest->Finish().Get().ValueOrThrow();
+ } else {
+ auto bodyRef = TSharedRef::FromString(TString(body ? *body : ""));
+ bool includeParameters = true;
+ auto headers = header.GetHeader(context.HostName, requestId, includeParameters).Get();
+
+ logRequest(includeParameters);
+
+ if (header.GetMethod() == "GET") {
+ response = RequestImpl(header.GetMethod(), url, headers, bodyRef);
+ } else {
+ auto activeRequest = StartRequestImpl(header.GetMethod(), url, headers);
+
+ auto request = std::make_unique<TCoreHttpRequest>(std::move(context), std::move(activeRequest));
+ if (body) {
+ request->GetStream()->Write(*body);
+ }
+ return request->Finish();
+ }
+ }
+
+ return std::make_unique<TCoreHttpResponse>(std::move(context), std::move(response));
+ }
+
+ IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpConfig& /*config*/, const THttpHeader& header) override
+ {
+ TCoreRequestContext context = CreateContext(url, requestId, header);
+
+ LogRequest(header, url, true, requestId, context.HostName);
+ context.LoggedAttributes = GetLoggedAttributes(header, url, true, 128);
+
+ auto headers = header.GetHeader(context.HostName, requestId, true).Get();
+ auto activeRequest = StartRequestImpl(header.GetMethod(), url, headers);
+
+ return std::make_unique<TCoreHttpRequest>(std::move(context), std::move(activeRequest));
+ }
+
+private:
+ TCoreRequestContext CreateContext(const TString& url, const TString& requestId, const THttpHeader& header)
+ {
+ TCoreRequestContext context;
+ context.Url = url;
+ context.RequestId = requestId;
+
+ auto urlRef = NHttp::ParseUrl(url);
+ context.HostName = CreateHost(urlRef.Host, urlRef.PortStr);
+
+ context.LogResponse = false;
+ auto outputFormat = header.GetOutputFormat();
+ if (outputFormat && outputFormat->IsTextYson()) {
+ context.LogResponse = true;
+ }
+ context.StartTime = TInstant::Now();
+ return context;
+ }
+
+ NHttp::IResponsePtr RequestImpl(const TString& method, const TString& url, const NHttp::THeadersPtr& headers, const TSharedRef& body)
+ {
+ if (method == "GET") {
+ return Client_->Get(url, headers).Get().ValueOrThrow();
+ } else if (method == "POST") {
+ return Client_->Post(url, body, headers).Get().ValueOrThrow();
+ } else if (method == "PUT") {
+ return Client_->Put(url, body, headers).Get().ValueOrThrow();
+ } else {
+ YT_LOG_FATAL("Unsupported http method (Method: %v, Url: %v)",
+ method,
+ url);
+ }
+ }
+
+ NHttp::IActiveRequestPtr StartRequestImpl(const TString& method, const TString& url, const NHttp::THeadersPtr& headers)
+ {
+ if (method == "POST") {
+ return Client_->StartPost(url, headers).Get().ValueOrThrow();
+ } else if (method == "PUT") {
+ return Client_->StartPut(url, headers).Get().ValueOrThrow();
+ } else {
+ YT_LOG_FATAL("Unsupported http method (Method: %v, Url: %v)",
+ method,
+ url);
+ }
+ }
+
+ NConcurrency::IThreadPoolPollerPtr Poller_;
+ NHttp::IClientPtr Client_;
+};
+
+///////////////////////////////////////////////////////////////////////////////
+
+IHttpClientPtr CreateDefaultHttpClient()
+{
+ return std::make_shared<TDefaultHttpClient>();
+}
+
+IHttpClientPtr CreateCoreHttpClient(bool useTLS, const TConfigPtr& config)
+{
+ return std::make_shared<TCoreHttpClient>(useTLS, config);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NHttpClient
diff --git a/yt/cpp/mapreduce/http/http_client.h b/yt/cpp/mapreduce/http/http_client.h
new file mode 100644
index 00000000000..859f0423cb4
--- /dev/null
+++ b/yt/cpp/mapreduce/http/http_client.h
@@ -0,0 +1,76 @@
+#pragma once
+
+#include "fwd.h"
+
+#include <yt/cpp/mapreduce/interface/fwd.h>
+
+#include <util/datetime/base.h>
+
+#include <util/generic/maybe.h>
+#include <util/generic/string.h>
+
+#include <util/stream/fwd.h>
+
+#include <memory>
+
+namespace NYT::NHttpClient {
+
+///////////////////////////////////////////////////////////////////////////////
+
+struct THttpConfig
+{
+ TDuration SocketTimeout = TDuration::Zero();
+};
+
+///////////////////////////////////////////////////////////////////////////////
+
+class IHttpResponse
+{
+public:
+ virtual ~IHttpResponse() = default;
+
+ virtual int GetStatusCode() = 0;
+ virtual IInputStream* GetResponseStream() = 0;
+ virtual TString GetResponse() = 0;
+ virtual TString GetRequestId() const = 0;
+};
+
+class IHttpRequest
+{
+public:
+ virtual ~IHttpRequest() = default;
+
+ virtual IOutputStream* GetStream() = 0;
+ virtual IHttpResponsePtr Finish() = 0;
+};
+
+
+class IHttpClient
+{
+public:
+ virtual ~IHttpClient() = default;
+
+ virtual IHttpResponsePtr Request(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header, TMaybe<TStringBuf> body = {}) = 0;
+
+ virtual IHttpResponsePtr Request(const TString& url, const TString& requestId, const THttpHeader& header, TMaybe<TStringBuf> body = {})
+ {
+ return Request(url, requestId, /*config*/ {}, header, body);
+ }
+
+ virtual IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header) = 0;
+
+ virtual IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpHeader& header)
+ {
+ return StartRequest(url, requestId, /*config*/ {}, header);
+ }
+};
+
+///////////////////////////////////////////////////////////////////////////////
+
+IHttpClientPtr CreateDefaultHttpClient();
+
+IHttpClientPtr CreateCoreHttpClient(bool useTLS, const TConfigPtr& config);
+
+///////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NHttpClient
diff --git a/yt/cpp/mapreduce/http/requests.cpp b/yt/cpp/mapreduce/http/requests.cpp
new file mode 100644
index 00000000000..7cf0f673bb4
--- /dev/null
+++ b/yt/cpp/mapreduce/http/requests.cpp
@@ -0,0 +1,66 @@
+#include "requests.h"
+
+#include "context.h"
+#include "host_manager.h"
+#include "retry_request.h"
+
+#include <yt/cpp/mapreduce/client/transaction.h>
+
+#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+#include <yt/cpp/mapreduce/common/node_builder.h>
+#include <yt/cpp/mapreduce/common/wait_proxy.h>
+
+#include <yt/cpp/mapreduce/interface/config.h>
+#include <yt/cpp/mapreduce/interface/errors.h>
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+#include <yt/cpp/mapreduce/interface/serialize.h>
+
+#include <util/stream/file.h>
+#include <util/string/builder.h>
+#include <util/generic/buffer.h>
+
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+bool ParseBoolFromResponse(const TString& response)
+{
+ return GetBool(NodeFromYsonString(response));
+}
+
+TGUID ParseGuidFromResponse(const TString& response)
+{
+ auto node = NodeFromYsonString(response);
+ return GetGuid(node.AsString());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TString GetProxyForHeavyRequest(const TClientContext& context)
+{
+ if (!context.Config->UseHosts) {
+ return context.ServerName;
+ }
+
+ return NPrivate::THostManager::Get().GetProxyForHeavyRequest(context);
+}
+
+void LogRequestError(
+ const TString& requestId,
+ const THttpHeader& header,
+ const TString& message,
+ const TString& attemptDescription)
+{
+ YT_LOG_ERROR("RSP %v - %v - %v - %v - X-YT-Parameters: %v",
+ requestId,
+ header.GetUrl(),
+ message,
+ attemptDescription,
+ NodeToYsonString(header.GetParameters()));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/http/requests.h b/yt/cpp/mapreduce/http/requests.h
new file mode 100644
index 00000000000..2c692475d1c
--- /dev/null
+++ b/yt/cpp/mapreduce/http/requests.h
@@ -0,0 +1,29 @@
+#pragma once
+
+#include "fwd.h"
+#include "http.h"
+
+#include <util/generic/maybe.h>
+#include <util/str_stl.h>
+
+namespace NYT {
+
+///////////////////////////////////////////////////////////////////////////////
+
+bool ParseBoolFromResponse(const TString& response);
+
+TGUID ParseGuidFromResponse(const TString& response);
+
+////////////////////////////////////////////////////////////////////////////////
+
+TString GetProxyForHeavyRequest(const TClientContext& context);
+
+void LogRequestError(
+ const TString& requestId,
+ const THttpHeader& header,
+ const TString& message,
+ const TString& attemptDescription);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/http/retry_request.cpp b/yt/cpp/mapreduce/http/retry_request.cpp
new file mode 100644
index 00000000000..ba116edcf7a
--- /dev/null
+++ b/yt/cpp/mapreduce/http/retry_request.cpp
@@ -0,0 +1,149 @@
+#include "retry_request.h"
+
+#include "context.h"
+#include "helpers.h"
+#include "http_client.h"
+#include "requests.h"
+
+#include <yt/cpp/mapreduce/common/wait_proxy.h>
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+
+#include <yt/cpp/mapreduce/interface/config.h>
+#include <yt/cpp/mapreduce/interface/tvm.h>
+
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <library/cpp/yson/node/node_io.h>
+
+namespace NYT {
+namespace NDetail {
+
+///////////////////////////////////////////////////////////////////////////////
+
+static TResponseInfo Request(
+ const TClientContext& context,
+ THttpHeader& header,
+ TMaybe<TStringBuf> body,
+ const TString& requestId,
+ const TRequestConfig& config)
+{
+ TString hostName;
+ if (config.IsHeavy) {
+ hostName = GetProxyForHeavyRequest(context);
+ } else {
+ hostName = context.ServerName;
+ }
+
+ auto url = GetFullUrl(hostName, context, header);
+
+ auto response = context.HttpClient->Request(url, requestId, config.HttpConfig, header, body);
+
+ TResponseInfo result;
+ result.RequestId = requestId;
+ result.Response = response->GetResponse();
+ result.HttpCode = response->GetStatusCode();
+ return result;
+}
+
+TResponseInfo RequestWithoutRetry(
+ const TClientContext& context,
+ THttpHeader& header,
+ TMaybe<TStringBuf> body,
+ const TRequestConfig& config)
+{
+ if (context.ServiceTicketAuth) {
+ header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
+ } else {
+ header.SetToken(context.Token);
+ }
+
+ if (context.ImpersonationUser) {
+ header.SetImpersonationUser(*context.ImpersonationUser);
+ }
+
+ if (header.HasMutationId()) {
+ header.RemoveParameter("retry");
+ header.AddMutationId();
+ }
+ auto requestId = CreateGuidAsString();
+ return Request(context, header, body, requestId, config);
+}
+
+
+TResponseInfo RetryRequestWithPolicy(
+ IRequestRetryPolicyPtr retryPolicy,
+ const TClientContext& context,
+ THttpHeader& header,
+ TMaybe<TStringBuf> body,
+ const TRequestConfig& config)
+{
+ if (context.ServiceTicketAuth) {
+ header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
+ } else {
+ header.SetToken(context.Token);
+ }
+
+ if (context.ImpersonationUser) {
+ header.SetImpersonationUser(*context.ImpersonationUser);
+ }
+
+ bool useMutationId = header.HasMutationId();
+ bool retryWithSameMutationId = false;
+
+ if (!retryPolicy) {
+ retryPolicy = CreateDefaultRequestRetryPolicy(context.Config);
+ }
+
+ while (true) {
+ auto requestId = CreateGuidAsString();
+ try {
+ retryPolicy->NotifyNewAttempt();
+
+ if (useMutationId) {
+ if (retryWithSameMutationId) {
+ header.AddParameter("retry", true, /* overwrite = */ true);
+ } else {
+ header.RemoveParameter("retry");
+ header.AddMutationId();
+ }
+ }
+
+ return Request(context, header, body, requestId, config);
+ } catch (const TErrorResponse& e) {
+ LogRequestError(requestId, header, e.GetError().GetMessage(), retryPolicy->GetAttemptDescription());
+ retryWithSameMutationId = e.IsTransportError();
+
+ if (!IsRetriable(e)) {
+ throw;
+ }
+
+ auto maybeRetryTimeout = retryPolicy->OnRetriableError(e);
+ if (maybeRetryTimeout) {
+ TWaitProxy::Get()->Sleep(*maybeRetryTimeout);
+ } else {
+ throw;
+ }
+ } catch (const std::exception& e) {
+ LogRequestError(requestId, header, e.what(), retryPolicy->GetAttemptDescription());
+ retryWithSameMutationId = true;
+
+ if (!IsRetriable(e)) {
+ throw;
+ }
+
+ auto maybeRetryTimeout = retryPolicy->OnGenericError(e);
+ if (maybeRetryTimeout) {
+ TWaitProxy::Get()->Sleep(*maybeRetryTimeout);
+ } else {
+ throw;
+ }
+ }
+ }
+
+ Y_FAIL("Retries must have either succeeded or thrown an exception");
+}
+
+///////////////////////////////////////////////////////////////////////////////
+
+} // namespace NDetail
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/http/retry_request.h b/yt/cpp/mapreduce/http/retry_request.h
new file mode 100644
index 00000000000..2210e318f10
--- /dev/null
+++ b/yt/cpp/mapreduce/http/retry_request.h
@@ -0,0 +1,52 @@
+#pragma once
+
+#include "fwd.h"
+
+#include <yt/cpp/mapreduce/interface/fwd.h>
+#include <yt/cpp/mapreduce/common/fwd.h>
+
+#include <yt/cpp/mapreduce/http/http_client.h>
+
+#include <util/datetime/base.h>
+#include <util/generic/maybe.h>
+#include <util/generic/string.h>
+
+namespace NYT::NDetail {
+
+////////////////////////////////////////////////////////////////////
+
+struct TResponseInfo
+{
+ TString RequestId;
+ TString Response;
+ int HttpCode = 0;
+};
+
+////////////////////////////////////////////////////////////////////
+
+struct TRequestConfig
+{
+ NHttpClient::THttpConfig HttpConfig;
+ bool IsHeavy = false;
+};
+
+////////////////////////////////////////////////////////////////////
+
+// Retry request with given `header' and `body' using `retryPolicy'.
+// If `retryPolicy == nullptr' use default, currently `TAttemptLimitedRetryPolicy(TConfig::Get()->RetryCount)`.
+TResponseInfo RetryRequestWithPolicy(
+ IRequestRetryPolicyPtr retryPolicy,
+ const TClientContext& context,
+ THttpHeader& header,
+ TMaybe<TStringBuf> body = {},
+ const TRequestConfig& config = TRequestConfig());
+
+TResponseInfo RequestWithoutRetry(
+ const TClientContext& context,
+ THttpHeader& header,
+ TMaybe<TStringBuf> body = {},
+ const TRequestConfig& config = TRequestConfig());
+
+////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/http/ya.make b/yt/cpp/mapreduce/http/ya.make
new file mode 100644
index 00000000000..ef81a4b64a7
--- /dev/null
+++ b/yt/cpp/mapreduce/http/ya.make
@@ -0,0 +1,29 @@
+LIBRARY()
+
+INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
+
+SRCS(
+ abortable_http_response.cpp
+ context.cpp
+ helpers.cpp
+ host_manager.cpp
+ http.cpp
+ http_client.cpp
+ requests.cpp
+ retry_request.cpp
+)
+
+PEERDIR(
+ library/cpp/deprecated/atomic
+ library/cpp/http/io
+ library/cpp/string_utils/base64
+ library/cpp/string_utils/quote
+ library/cpp/threading/cron
+ yt/cpp/mapreduce/common
+ yt/cpp/mapreduce/interface
+ yt/cpp/mapreduce/interface/logging
+ yt/yt/core/http
+ yt/yt/core/https
+)
+
+END()