aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/http
diff options
context:
space:
mode:
authorannashest18 <annashest18@yandex-team.com>2023-10-22 13:19:53 +0300
committerannashest18 <annashest18@yandex-team.com>2023-10-22 13:40:13 +0300
commit24a44df2ab01109cefd64d4bfad05c66e7ebd2af (patch)
tree1ec1ca95a2a8aceaa383fde2e19e61db9ab4fe02 /yt/cpp/mapreduce/http
parenta991b019f74cbfe6efb40c3b68b46a1ac2a9bd09 (diff)
downloadydb-24a44df2ab01109cefd64d4bfad05c66e7ebd2af.tar.gz
add using http-proxy for reading table from YT
add using http-proxy for reading table from YT Нам нужна возможность ходить в YT через HTTP proxy для чтения таблиц, используя С++ клиент не из контура Яндекса, к сожалению, сейчас такой возможности нет. В этом ПР черновик изменения, которого нам достаточно https://a.yandex-team.ru/review/4676436/details - тут это же изменение в YT + коммит с тем, как мы планируем использовать
Diffstat (limited to 'yt/cpp/mapreduce/http')
-rw-r--r--yt/cpp/mapreduce/http/context.cpp3
-rw-r--r--yt/cpp/mapreduce/http/context.h1
-rw-r--r--yt/cpp/mapreduce/http/helpers.cpp17
-rw-r--r--yt/cpp/mapreduce/http/helpers.h4
-rw-r--r--yt/cpp/mapreduce/http/host_manager.cpp6
-rw-r--r--yt/cpp/mapreduce/http/http.cpp25
-rw-r--r--yt/cpp/mapreduce/http/http.h6
-rw-r--r--yt/cpp/mapreduce/http/requests.cpp2
-rw-r--r--yt/cpp/mapreduce/http/retry_request.cpp6
9 files changed, 60 insertions, 10 deletions
diff --git a/yt/cpp/mapreduce/http/context.cpp b/yt/cpp/mapreduce/http/context.cpp
index 1c016263c5..d158a55195 100644
--- a/yt/cpp/mapreduce/http/context.cpp
+++ b/yt/cpp/mapreduce/http/context.cpp
@@ -12,7 +12,8 @@ bool operator==(const TClientContext& lhs, const TClientContext& rhs)
lhs.ServiceTicketAuth == rhs.ServiceTicketAuth &&
lhs.HttpClient == rhs.HttpClient &&
lhs.UseTLS == rhs.UseTLS &&
- lhs.TvmOnly == rhs.TvmOnly;
+ lhs.TvmOnly == rhs.TvmOnly &&
+ lhs.ProxyAddress == rhs.ProxyAddress;
}
bool operator!=(const TClientContext& lhs, const TClientContext& rhs)
diff --git a/yt/cpp/mapreduce/http/context.h b/yt/cpp/mapreduce/http/context.h
index 3926373e17..9c730f1c3a 100644
--- a/yt/cpp/mapreduce/http/context.h
+++ b/yt/cpp/mapreduce/http/context.h
@@ -21,6 +21,7 @@ struct TClientContext
bool TvmOnly = false;
bool UseTLS = false;
TConfigPtr Config = TConfig::Get();
+ TMaybe<TString> ProxyAddress;
};
bool operator==(const TClientContext& lhs, const TClientContext& rhs);
diff --git a/yt/cpp/mapreduce/http/helpers.cpp b/yt/cpp/mapreduce/http/helpers.cpp
index 8a5fe16138..18f74b5958 100644
--- a/yt/cpp/mapreduce/http/helpers.cpp
+++ b/yt/cpp/mapreduce/http/helpers.cpp
@@ -41,6 +41,23 @@ TString GetFullUrl(const TString& hostName, const TClientContext& context, THttp
return Format("http://%v%v", hostName, header.GetUrl());
}
+void UpdateHeaderForProxyIfNeed(const TString& hostName, const TClientContext& context, THttpHeader& header)
+{
+ if (context.ProxyAddress) {
+ header.SetHostPort(Format("http://%v", hostName));
+ header.SetProxyAddress(*context.ProxyAddress);
+ }
+}
+
+TString GetFullUrlForProxy(const TString& hostName, const TClientContext& context, THttpHeader& header)
+{
+ if (context.ProxyAddress) {
+ THttpHeader emptyHeader(header.GetMethod(), "", false);
+ return GetFullUrl(*context.ProxyAddress, context, emptyHeader);
+ }
+ return GetFullUrl(hostName, context, header);
+}
+
static TString GetParametersDebugString(const THttpHeader& header)
{
const auto& parameters = header.GetParameters();
diff --git a/yt/cpp/mapreduce/http/helpers.h b/yt/cpp/mapreduce/http/helpers.h
index 0c510fa2e8..82ef799ca1 100644
--- a/yt/cpp/mapreduce/http/helpers.h
+++ b/yt/cpp/mapreduce/http/helpers.h
@@ -14,6 +14,10 @@ TString CreateHostNameWithPort(const TString& name, const TClientContext& contex
TString GetFullUrl(const TString& hostName, const TClientContext& context, THttpHeader& header);
+void UpdateHeaderForProxyIfNeed(const TString& hostName, const TClientContext& context, THttpHeader& header);
+
+TString GetFullUrlForProxy(const TString& hostName, const TClientContext& context, THttpHeader& header);
+
TString TruncateForLogs(const TString& text, size_t maxSize);
TString GetLoggedAttributes(const THttpHeader& header, const TString& url, bool includeParameters, size_t sizeLimit);
diff --git a/yt/cpp/mapreduce/http/host_manager.cpp b/yt/cpp/mapreduce/http/host_manager.cpp
index a239dde769..e9716ec7a6 100644
--- a/yt/cpp/mapreduce/http/host_manager.cpp
+++ b/yt/cpp/mapreduce/http/host_manager.cpp
@@ -94,7 +94,7 @@ void THostManager::Reset()
TString THostManager::GetProxyForHeavyRequest(const TClientContext& context)
{
- auto cluster = context.ServerName;
+ auto cluster = context.ProxyAddress ? *context.ProxyAddress : context.ServerName;
{
auto guard = Guard(Lock_);
auto it = ClusterHosts_.find(cluster);
@@ -121,10 +121,10 @@ THostManager::TClusterHostList THostManager::GetHosts(const TClientContext& cont
THttpHeader header("GET", hostsEndpoint, false);
try {
- auto hostName = context.ServerName;
auto requestId = CreateGuidAsString();
// TODO: we need to set socket timeout here
- auto response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
+ UpdateHeaderForProxyIfNeed(context.ServerName, context, header);
+ auto response = context.HttpClient->Request(GetFullUrlForProxy(context.ServerName, context, header), requestId, header);
auto hosts = ParseJsonStringArray(response->GetResponse());
for (auto& host : hosts) {
host = CreateHostNameWithPort(host, context);
diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp
index c5c358c46c..987b6daf4a 100644
--- a/yt/cpp/mapreduce/http/http.cpp
+++ b/yt/cpp/mapreduce/http/http.cpp
@@ -210,6 +210,16 @@ void THttpHeader::SetToken(const TString& token)
Token = token;
}
+void THttpHeader::SetProxyAddress(const TString& proxyAddress)
+{
+ ProxyAddress = proxyAddress;
+}
+
+void THttpHeader::SetHostPort(const TString& hostPort)
+{
+ HostPort = hostPort;
+}
+
void THttpHeader::SetImpersonationUser(const TString& impersonationUser)
{
ImpersonationUser = impersonationUser;
@@ -250,10 +260,19 @@ TString THttpHeader::GetCommand() const
return Command;
}
-TString THttpHeader::GetUrl() const
+TString THttpHeader::GetUrl(bool needProxy) const
{
TStringStream url;
+ if (needProxy && !ProxyAddress.empty()) {
+ url << ProxyAddress << "/";
+ return url.Str();
+ }
+
+ if (!ProxyAddress.empty()) {
+ url << HostPort;
+ }
+
if (IsApi) {
url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command;
} else {
@@ -274,7 +293,7 @@ TString THttpHeader::GetHeaderAsString(const TString& hostName, const TString& r
result << Method << " " << GetUrl() << " HTTP/1.1\r\n";
- GetHeader(hostName, requestId, includeParameters).Get()->WriteTo(&result);
+ GetHeader(HostPort.Empty() ? hostName : HostPort, requestId, includeParameters).Get()->WriteTo(&result);
if (ShouldAcceptFraming()) {
result << "X-YT-Accept-Framing: 1\r\n";
@@ -914,7 +933,7 @@ void THttpRequest::Connect(TString hostName, TDuration socketTimeout)
IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool includeParameters)
{
auto strHeader = header.GetHeaderAsString(HostName, RequestId, includeParameters);
- Url_ = header.GetUrl();
+ Url_ = header.GetUrl(true);
LogRequest(header, Url_, includeParameters, RequestId, HostName);
diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h
index ee8783088d..8bfa00de1d 100644
--- a/yt/cpp/mapreduce/http/http.h
+++ b/yt/cpp/mapreduce/http/http.h
@@ -57,6 +57,8 @@ public:
bool HasMutationId() const;
void SetToken(const TString& token);
+ void SetProxyAddress(const TString& proxyAddress);
+ void SetHostPort(const TString& hostPort);
void SetImpersonationUser(const TString& impersonationUser);
void SetServiceTicket(const TString& ticket);
@@ -70,7 +72,7 @@ public:
void SetResponseCompression(const TString& compression);
TString GetCommand() const;
- TString GetUrl() const;
+ TString GetUrl(bool needProxy = false) const;
TString GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters = true) const;
NHttp::THeadersPtrWrapper GetHeader(const TString& hostName, const TString& requestId, bool includeParameters) const;
@@ -89,6 +91,8 @@ private:
TString Token;
TString ServiceTicket;
TNode Attributes;
+ TString ProxyAddress;
+ TString HostPort;
private:
TMaybe<TFormat> InputFormat = TFormat::YsonText();
diff --git a/yt/cpp/mapreduce/http/requests.cpp b/yt/cpp/mapreduce/http/requests.cpp
index 7cf0f673bb..7d95a10bc2 100644
--- a/yt/cpp/mapreduce/http/requests.cpp
+++ b/yt/cpp/mapreduce/http/requests.cpp
@@ -41,7 +41,7 @@ TGUID ParseGuidFromResponse(const TString& response)
TString GetProxyForHeavyRequest(const TClientContext& context)
{
if (!context.Config->UseHosts) {
- return context.ServerName;
+ return context.ProxyAddress ? *context.ProxyAddress : context.ServerName;
}
return NPrivate::THostManager::Get().GetProxyForHeavyRequest(context);
diff --git a/yt/cpp/mapreduce/http/retry_request.cpp b/yt/cpp/mapreduce/http/retry_request.cpp
index 383b3f16d5..63a36c2f78 100644
--- a/yt/cpp/mapreduce/http/retry_request.cpp
+++ b/yt/cpp/mapreduce/http/retry_request.cpp
@@ -34,7 +34,9 @@ static TResponseInfo Request(
hostName = context.ServerName;
}
- auto url = GetFullUrl(hostName, context, header);
+ UpdateHeaderForProxyIfNeed(hostName, context, header);
+
+ auto url = GetFullUrlForProxy(hostName, context, header);
auto response = context.HttpClient->Request(url, requestId, config.HttpConfig, header, body);
@@ -83,6 +85,8 @@ TResponseInfo RetryRequestWithPolicy(
header.SetToken(context.Token);
}
+ UpdateHeaderForProxyIfNeed(context.ServerName, context, header);
+
if (context.ImpersonationUser) {
header.SetImpersonationUser(*context.ImpersonationUser);
}